This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new d19f505c84 [SYSTEMDS-2229] Extended I/O Framework: Readers/Writers for 
Parquet
d19f505c84 is described below

commit d19f505c844dfeb0dc5933653a06deb29bc61bfd
Author: sayedkeika <adnan.elge...@campus.tu-berlin.de>
AuthorDate: Fri Apr 18 14:59:26 2025 +0200

    [SYSTEMDS-2229] Extended I/O Framework: Readers/Writers for Parquet
    
    Closes #2229.
---
 src/main/java/org/apache/sysds/common/Types.java   |   1 +
 .../sysds/runtime/io/FrameReaderParquet.java       | 157 +++++++++++++++
 .../runtime/io/FrameReaderParquetParallel.java     | 118 +++++++++++
 .../sysds/runtime/io/FrameWriterParquet.java       | 199 ++++++++++++++++++
 .../runtime/io/FrameWriterParquetParallel.java     | 120 +++++++++++
 .../sysds/test/functions/io/SeqParReadTest2.java   |  10 +
 .../io/parquet/FrameParquetSchemaTest.java         | 223 +++++++++++++++++++++
 7 files changed, 828 insertions(+)

diff --git a/src/main/java/org/apache/sysds/common/Types.java 
b/src/main/java/org/apache/sysds/common/Types.java
index c9820a2c09..e6d5f8e945 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -868,6 +868,7 @@ public interface Types {
                PROTO,  // protocol buffer representation
                HDF5,   // Hierarchical Data Format (HDF)
                COG,   // Cloud-optimized GeoTIFF
+               PARQUET, // parquet format for columnar data storage
                UNKNOWN;
                
                public boolean isIJV() {
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java
new file mode 100644
index 0000000000..ff23e9ea31
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.util.HDFSTool;
+
+/**
+ * Single-threaded frame parquet reader.
+ * 
+ */
+public class FrameReaderParquet extends FrameReader {
+
+       /**
+        * Reads a Parquet file from HDFS and converts it into a FrameBlock.
+        *
+        * @param fname  The HDFS file path to the Parquet file.
+        * @param schema The expected data types of the columns.
+        * @param names  The names of the columns.
+        * @param rlen   The expected number of rows.
+        * @param clen   The expected number of columns.
+        * @return A FrameBlock containing the data read from the Parquet file.
+        */
+       @Override
+       public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, 
String[] names, long rlen, long clen) throws IOException, DMLRuntimeException {
+               // Prepare file access
+               Configuration conf = ConfigurationManager.getCachedJobConf();
+               Path path = new Path(fname);
+
+               // Check existence and non-empty file
+               if (!HDFSTool.existsFileOnHDFS(path.toString())) {
+                       throw new IOException("File does not exist on HDFS: " + 
fname);
+               }
+
+               // Allocate output frame block
+               ValueType[] lschema = createOutputSchema(schema, clen);
+               String[] lnames = createOutputNames(names, clen);
+               FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
+
+               // Read Parquet file
+               readParquetFrameFromHDFS(path, conf, ret, lschema, rlen, clen);
+
+               return ret;
+       }
+
+       /**
+        * Reads data from a Parquet file on HDFS and fills the provided 
FrameBlock.
+        * The method retrieves the Parquet schema from the file footer, maps 
the required column names
+        * to their corresponding indices, and then uses a ParquetReader to 
iterate over each row.
+        * Data is extracted based on the column type and set into the output 
FrameBlock.
+        *
+        * @param path   The HDFS path to the Parquet file.
+        * @param conf   The Hadoop configuration.
+        * @param dest   The FrameBlock to populate with data.
+        * @param schema The expected value types for the output columns.
+        * @param rlen   The expected number of rows.
+        * @param clen   The expected number of columns.
+        */
+       protected void readParquetFrameFromHDFS(Path path, Configuration conf, 
FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException {
+               // Retrieve schema from Parquet footer
+               ParquetMetadata metadata = 
ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)).getFooter();
+               MessageType parquetSchema = 
metadata.getFileMetaData().getSchema();
+
+               // Map column names to Parquet schema indices
+               String[] columnNames = dest.getColumnNames();
+               int[] columnIndices = new int[columnNames.length];
+               for (int i = 0; i < columnNames.length; i++) {
+                       columnIndices[i] = 
parquetSchema.getFieldIndex(columnNames[i]);
+               }
+
+               // Read data usind ParquetReader
+               try (ParquetReader<Group> rowReader = ParquetReader.builder(new 
GroupReadSupport(), path)
+                               .withConf(conf)
+                               .build()) {
+
+                       Group group;
+                       int row = 0;
+                       while ((group = rowReader.read()) != null) {
+                               for (int col = 0; col < clen; col++) {
+                                       int colIndex = columnIndices[col];
+                                       if 
(group.getFieldRepetitionCount(colIndex) > 0) {
+                                               PrimitiveType.PrimitiveTypeName 
type = 
parquetSchema.getType(columnNames[col]).asPrimitiveType().getPrimitiveTypeName();
+                                               switch (type) {
+                                                       case INT32:
+                                                               dest.set(row, 
col, group.getInteger(colIndex, 0));
+                                                               break;
+                                                       case INT64:
+                                                               dest.set(row, 
col, group.getLong(colIndex, 0));
+                                                               break;
+                                                       case FLOAT:
+                                                               dest.set(row, 
col, group.getFloat(colIndex, 0));
+                                                               break;
+                                                       case DOUBLE:
+                                                               dest.set(row, 
col, group.getDouble(colIndex, 0));
+                                                               break;
+                                                       case BOOLEAN:
+                                                               dest.set(row, 
col, group.getBoolean(colIndex, 0));
+                                                               break;
+                                                       case BINARY:
+                                                               dest.set(row, 
col, group.getBinary(colIndex, 0).toStringUsingUTF8());
+                                                               break;
+                                                       default:
+                                                               throw new 
IOException("Unsupported data type: " + type);
+                                               }
+                                       } else {
+                                               dest.set(row, col, null);
+                                       }
+                               }
+                               row++;
+                       }
+
+                       // Check frame dimensions
+                       if (row != rlen) {
+                               throw new IOException("Mismatch in row count: 
expected " + rlen + ", but got " + row);
+                       }
+               }
+       }
+
+       //not implemented
+       @Override
+       public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] 
schema, String[] names, long rlen, long clen)
+                       throws IOException, DMLRuntimeException {
+               throw new UnsupportedOperationException("Unimplemented method 
'readFrameFromInputStream'");
+       }
+}
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java
new file mode 100644
index 0000000000..3d40f53c62
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+
+/**
+ * Multi-threaded frame parquet reader.
+ * 
+ */
+public class FrameReaderParquetParallel extends FrameReaderParquet {
+       
+       /**
+        * Reads a Parquet frame in parallel and populates the provided 
FrameBlock with the data.
+        * The method retrieves all file paths from the sequence files at that 
location, it then determines 
+        * the number of threads to use based on the available files and a 
configured parallelism setting.
+        * A thread pool is created to run a reading task for each file 
concurrently.
+        *
+        * @param path   The HDFS path to the Parquet file or the directory 
containing sequence files.
+        * @param conf   The Hadoop configuration.
+        * @param dest   The FrameBlock to be updated with the data read from 
the files.
+        * @param schema The expected value types for the frame columns.
+        * @param rlen   The expected number of rows.
+        * @param clen   The expected number of columns.
+        */
+       @Override
+       protected void readParquetFrameFromHDFS(Path path, Configuration conf, 
FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException, 
DMLRuntimeException {
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
+               Path[] files = IOUtilFunctions.getSequenceFilePaths(fs, path);
+               int numThreads = 
Math.min(OptimizerUtils.getParallelBinaryReadParallelism(), files.length);
+               
+               // Create and execute read tasks
+               ExecutorService pool = CommonThreadPool.get(numThreads);
+               try {
+                       List<ReadFileTask> tasks = new ArrayList<>();
+                       for (Path file : files) {
+                               tasks.add(new ReadFileTask(file, conf, dest, 
schema, clen));
+                       }
+
+                       for (Future<Object> task : pool.invokeAll(tasks)) {
+                               task.get();
+                       }
+               } catch (Exception e) {
+                       throw new IOException("Failed parallel read of Parquet 
frame.", e);
+               } finally {
+                       pool.shutdown();
+               }
+       }
+
+       private class ReadFileTask implements Callable<Object> {
+               private Path path;
+               private Configuration conf;
+               private FrameBlock dest;
+               @SuppressWarnings("unused")
+               private ValueType[] schema;
+               private long clen;
+
+               public ReadFileTask(Path path, Configuration conf, FrameBlock 
dest, ValueType[] schema, long clen) {
+                       this.path = path;
+                       this.conf = conf;
+                       this.dest = dest;
+                       this.schema = schema;
+                       this.clen = clen;
+               }
+
+               // When executed, a ParquetReader for the assigned file opens 
and iterates over each row processing every column.
+               @Override
+               public Object call() throws Exception {
+                       try (ParquetReader<Group> reader = 
ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) {
+                               Group group;
+                               int row = 0;
+                               while ((group = reader.read()) != null) {
+                                       for (int col = 0; col < clen; col++) {
+                                               if 
(group.getFieldRepetitionCount(col) > 0) {
+                                                       dest.set(row, col, 
group.getValueToString(col, 0));
+                                               } else {
+                                                       dest.set(row, col, 
null);
+                                               }
+                                       }
+                                       row++;
+                               }
+                       }
+                       return null;
+               }
+       }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java
new file mode 100644
index 0000000000..ccaeeb56d5
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.common.Types.ValueType;
+
+/**
+ * Single-threaded frame parquet writer.
+ * 
+ */
+public class FrameWriterParquet extends FrameWriter {
+
+       /**
+        * Writes a FrameBlock to a Parquet file on HDFS.
+        *
+        * @param src   The FrameBlock containing the data to write.
+        * @param fname The HDFS file path where the Parquet file will be 
stored.
+        * @param rlen  The expected number of rows.
+        * @param clen  The expected number of columns.
+        */
+       @Override
+       public final void writeFrameToHDFS(FrameBlock src, String fname, long 
rlen, long clen) throws IOException, DMLRuntimeException {
+               // Prepare file access
+               JobConf conf = ConfigurationManager.getCachedJobConf();
+               Path path = new Path(fname);
+
+               // If the file already exists on HDFS, remove it
+               HDFSTool.deleteFileIfExistOnHDFS(path, conf);
+               
+               // Check frame dimensions
+               if (src.getNumRows() != rlen || src.getNumColumns() != clen) {
+                       throw new IOException("Frame dimensions mismatch with 
metadata: " + src.getNumRows() + "x" + src.getNumColumns() + " vs " + rlen + 
"x" + clen + ".");
+               }
+
+               // Write parquet file
+               writeParquetFrameToHDFS(path, conf, src);
+       }
+
+       /**
+        * Writes the FrameBlock data to a Parquet file using a ParquetWriter.
+        * The method generates a Parquet schema based on the metadata of the 
FrameBlock, initializes a ParquetWriter with specified configurations, 
+        * iterates over each row and column, adding values (in batches for 
improved performance) using type-specific conversions.
+        *
+        * @param path The HDFS path where the Parquet file will be written.
+        * @param conf The Hadoop configuration.
+        * @param src  The FrameBlock containing the data to write.
+        */
+       protected void writeParquetFrameToHDFS(Path path, Configuration conf, 
FrameBlock src) 
+               throws IOException 
+       {
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
+
+               // Create schema based on frame block metadata
+               MessageType schema = createParquetSchema(src);
+
+               // TODO:Experiment with different batch sizes?
+               int batchSize = 1000;  
+               int rowCount = 0;
+
+               // Write data using ParquetWriter //FIXME replace example 
writer? 
+               try (ParquetWriter<Group> writer = 
ExampleParquetWriter.builder(path)
+                               .withConf(conf)
+                               .withType(schema)
+                               
.withCompressionCodec(ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME)
+                               .withRowGroupSize((long) 
ParquetWriter.DEFAULT_BLOCK_SIZE)
+                               .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
+                               .withDictionaryEncoding(true)
+                               .build()) 
+               {
+
+                       SimpleGroupFactory groupFactory = new 
SimpleGroupFactory(schema);
+                       
+                       List<Group> rowBuffer = new ArrayList<>(batchSize);
+                       
+                       for (int i = 0; i < src.getNumRows(); i++) {
+                               Group group = groupFactory.newGroup();
+                               for (int j = 0; j < src.getNumColumns(); j++) {
+                                       Object value = src.get(i, j);
+                                       if (value != null) {
+                                               ValueType type = 
src.getSchema()[j];
+                                               switch (type) {
+                                                       case STRING:
+                                                               
group.add(src.getColumnNames()[j], value.toString());
+                                                               break;
+                                                       case INT32:
+                                                               
group.add(src.getColumnNames()[j], (int) value);
+                                                               break;
+                                                       case INT64:
+                                                               
group.add(src.getColumnNames()[j], (long) value);
+                                                               break;
+                                                       case FP32:
+                                                               
group.add(src.getColumnNames()[j], (float) value);
+                                                               break;
+                                                       case FP64:
+                                                               
group.add(src.getColumnNames()[j], (double) value);
+                                                               break;
+                                                       case BOOLEAN:
+                                                               
group.add(src.getColumnNames()[j], (boolean) value);
+                                                               break;
+                                                       default:
+                                                               throw new 
IOException("Unsupported value type: " + type);
+                                               }
+                                       }
+                               }
+                               rowBuffer.add(group);
+                               rowCount++;
+
+                               if (rowCount >= batchSize) {
+                                       for (Group g : rowBuffer) {
+                                               writer.write(g);
+                                       }
+                                       rowBuffer.clear();
+                                       rowCount = 0;
+                               }
+                       }
+                       
+                       for (Group g : rowBuffer) {
+                               writer.write(g);
+                       }
+               }
+               
+               // Delete CRC files created by Hadoop if necessary
+               IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
+       }
+
+       /**
+        * Creates a Parquet schema based on the metadata of a FrameBlock.
+        *
+        * @param src The FrameBlock whose metadata is used to create the 
Parquet schema.
+        * @return The generated Parquet MessageType schema.
+        */
+       protected MessageType createParquetSchema(FrameBlock src) {
+               StringBuilder schemaBuilder = new StringBuilder("message 
FrameSchema {");
+               String[] columnNames = src.getColumnNames();
+               ValueType[] columnTypes = src.getSchema();
+
+               for (int i = 0; i < src.getNumColumns(); i++) {
+                       schemaBuilder.append("optional ");
+                       switch (columnTypes[i]) {
+                               case STRING:
+                                       schemaBuilder.append("binary 
").append(columnNames[i]).append(" (UTF8);");
+                                       break;
+                               case INT32:
+                                       schemaBuilder.append("int32 
").append(columnNames[i]).append(";");
+                                       break;
+                               case INT64:
+                                       schemaBuilder.append("int64 
").append(columnNames[i]).append(";");
+                                       break;
+                               case FP32:
+                                       schemaBuilder.append("float 
").append(columnNames[i]).append(";");
+                                       break;
+                               case FP64:
+                                       schemaBuilder.append("double 
").append(columnNames[i]).append(";");
+                                       break;
+                               case BOOLEAN:
+                                       schemaBuilder.append("boolean 
").append(columnNames[i]).append(";");
+                                       break;
+                               default:
+                                       throw new 
IllegalArgumentException("Unsupported data type: " + columnTypes[i]);
+                       }
+               }
+               schemaBuilder.append("}");
+               return 
MessageTypeParser.parseMessageType(schemaBuilder.toString());
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java
new file mode 100644
index 0000000000..0ef4431ef4
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
+
+/**
+ * Multi-threaded frame parquet reader.
+ * 
+ */
+public class FrameWriterParquetParallel extends FrameWriterParquet {
+
+       /**
+        * Writes the FrameBlock data to HDFS in parallel. 
+        * The method estimates the number of output partitions by comparing 
the total number of cells in the FrameBlock with the
+        * HDFS block size. It then determines the number of threads to use 
based on the parallelism configuration and the
+        * number of partitions. In case of parallelism, it divides the 
FrameBlock into chunks and a thread pool is created to
+        * execute a write task for each partition concurrently.
+        *
+        * @param path The HDFS path where the Parquet files will be written.
+        * @param conf The Hadoop configuration.
+        * @param src  The FrameBlock containing the data to write.
+        */
+       @Override
+       protected void writeParquetFrameToHDFS(Path path, Configuration conf, 
FrameBlock src) 
+               throws IOException, DMLRuntimeException 
+       {
+               // Estimate number of output partitions
+               int numPartFiles = Math.max((int) (src.getNumRows() * 
src.getNumColumns() / InfrastructureAnalyzer.getHDFSBlockSize()), 1);
+               
+               // Determine parallelism
+               int numThreads = 
Math.min(OptimizerUtils.getParallelBinaryWriteParallelism(), numPartFiles);
+
+               // Fall back to sequential write if numThreads <= 1
+               if (numThreads <= 1) {
+                       super.writeParquetFrameToHDFS(path, conf, src);
+                       return;
+               }
+
+               // Create directory for concurrent tasks
+               HDFSTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+
+               // Create and execute write tasks
+               ExecutorService pool = CommonThreadPool.get(numThreads);
+               try {
+                       List<WriteFileTask> tasks = new ArrayList<>();
+                       int chunkSize = (int) Math.ceil((double) 
src.getNumRows() / numThreads);
+
+                       for (int i = 0; i < numThreads; i++) {
+                               int startRow = i * chunkSize;
+                               int endRow = Math.min((i + 1) * chunkSize, 
(int) src.getNumRows());
+                               if (startRow < endRow) {
+                                       Path newPath = new Path(path, 
IOUtilFunctions.getPartFileName(i));
+                                       tasks.add(new WriteFileTask(newPath, 
conf, src.slice(startRow, endRow - 1)));
+                               }
+                       }
+
+                       for (Future<Object> task : pool.invokeAll(tasks))
+                               task.get();
+               } catch (Exception e) {
+                       throw new IOException("Failed parallel write of Parquet 
frame.", e);
+               } finally {
+                       pool.shutdown();
+               }
+       }
+       
+       protected void writeSingleParquetFile(Path path, Configuration conf, 
FrameBlock src)
+               throws IOException, DMLRuntimeException 
+       {
+               super.writeParquetFrameToHDFS(path, conf, src);
+       }
+       
+       private class WriteFileTask implements Callable<Object> {
+               private Path path;
+               private Configuration conf;
+               private FrameBlock src;
+
+               public WriteFileTask(Path path, Configuration conf, FrameBlock 
src) {
+                       this.path = path;
+                       this.conf = conf;
+                       this.src = src;
+               }
+
+               @Override
+               public Object call() throws Exception {
+                       writeSingleParquetFile(path, conf, src);
+                       return null;
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java 
b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
index 6bdc428a90..a4070a119d 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
@@ -73,6 +73,10 @@ import org.apache.sysds.runtime.io.WriterTextCell;
 import org.apache.sysds.runtime.io.WriterTextCellParallel;
 import org.apache.sysds.runtime.io.WriterTextLIBSVM;
 import org.apache.sysds.runtime.io.WriterTextLIBSVMParallel;
+import org.apache.sysds.runtime.io.FrameReaderParquet;
+import org.apache.sysds.runtime.io.FrameReaderParquetParallel;
+import org.apache.sysds.runtime.io.FrameWriterParquet;
+import org.apache.sysds.runtime.io.FrameWriterParquetParallel;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DataConverter;
 import org.apache.sysds.runtime.util.UtilFunctions;
@@ -159,6 +163,10 @@ public class SeqParReadTest2 extends AutomatedTestBase {
                        {true, "libsvm", false, 0.1},
                        {true, "libsvm", true, 0.7},
                        {true, "libsvm", true, 0.1},
+                       {false, "parquet", false, 0.7},
+                       {false, "parquet", false, 0.1},
+                       {false, "parquet", true, 0.7},
+                       {false, "parquet", true, 0.1},
                };
                return Arrays.asList(data);
        }
@@ -255,6 +263,7 @@ public class SeqParReadTest2 extends AutomatedTestBase {
                                                        new 
FrameWriterTextCSVParallel(new FileFormatPropertiesCSV()) :
                                                        new 
FrameWriterTextCSV(new FileFormatPropertiesCSV());
                        case BINARY: return par ? new 
FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock();
+                       case PARQUET: return par ? new 
FrameWriterParquetParallel() : new FrameWriterParquet();
                }
                return null;
        }
@@ -268,6 +277,7 @@ public class SeqParReadTest2 extends AutomatedTestBase {
                                                        new 
FrameReaderTextCSVParallel(new FileFormatPropertiesCSV()) :
                                                        new 
FrameReaderTextCSV(new FileFormatPropertiesCSV());
                        case BINARY: return par ? new 
FrameReaderBinaryBlockParallel() : new FrameReaderBinaryBlock();
+                       case PARQUET: return par ? new 
FrameReaderParquetParallel() : new FrameReaderParquet();
                }
                return null;
        }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java
 
b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java
new file mode 100644
index 0000000000..dc776c8eab
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.io.parquet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.io.FrameReader;
+import org.apache.sysds.runtime.io.FrameReaderParquet;
+import org.apache.sysds.runtime.io.FrameReaderParquetParallel;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterParquet;
+import org.apache.sysds.runtime.io.FrameWriterParquetParallel;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+
+import java.io.IOException;
+
+/**
+ * This test class verifies that a FrameBlock with different data types is 
correctly written and read from Parquet files. 
+ * It tests both sequential and parallel implementations. In these tests a 
FrameBlock is created, populated with sample 
+ * data, written to a Parquet file, and then read back into a new FrameBlock. 
The test compares the original and read 
+ * data to ensure that schema information is preserved and that data 
conversion is performed correctly.
+ */
+public class FrameParquetSchemaTest extends AutomatedTestBase {
+
+    private final static String TEST_NAME = "FrameParquetSchemaTest";
+    private final static String TEST_DIR = "functions/io/parquet";
+    private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameParquetSchemaTest.class.getSimpleName() + "/";
+
+    @Override
+    public void setUp() {
+        addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, 
TEST_NAME, new String[]{"Rout"}));
+    }
+
+
+    /**
+     * Test for sequential writer and reader
+     * 
+     */
+    @Test
+    public void testParquetWriteReadAllSchemaTypes() {
+        String fname = output("Rout");
+
+        // Define a schema with one column per type
+        ValueType[] schema = new ValueType[] {
+            ValueType.FP64,
+            ValueType.FP32,
+            ValueType.INT32,
+            ValueType.INT64,
+            ValueType.BOOLEAN,
+            ValueType.STRING
+        };
+
+        // Create an empty frame block with the above schema
+        FrameBlock fb = new FrameBlock(schema);
+
+        // Populate frame block
+        Object[][] rows = new Object[][] {
+            { 1.0,    1.1f, 10,  100L,  true,  "A" },
+            { 2.0,    2.1f, 20,  200L,  false, "B" },
+            { 3.0,    3.1f, 30,  300L,  true,  "C" },
+            { 4.0,    4.1f, 40,  400L,  false, "D" },
+            { 5.0,    5.1f, 50,  500L,  true,  "E" }
+        };
+
+        for (Object[] row : rows) {
+            fb.appendRow(row);
+        }
+
+        System.out.println(fb);
+
+        int numRows = fb.getNumRows();
+        int numCols = fb.getNumColumns();
+
+        // Write the FrameBlock to a Parquet file using the sequential writer
+        try {
+            FrameWriter writer = new FrameWriterParquet();
+            writer.writeFrameToHDFS(fb, fname, numRows, numCols);
+        } 
+        catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail("Failed to write frame block to Parquet: " + 
e.getMessage());
+        }
+
+        // Read the Parquet file back into a new FrameBlock
+        FrameBlock fbRead = null;
+        try {
+            FrameReader reader = new FrameReaderParquet();
+            String[] colNames = fb.getColumnNames();
+            fbRead = reader.readFrameFromHDFS(fname, schema, colNames, 
numRows, numCols);
+        } 
+        catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail("Failed to read frame block from Parquet: " + 
e.getMessage());
+        }
+
+        // Compare the original and the read frame blocks
+        compareFrameBlocks(fb, fbRead, 1e-6);
+    }
+
+    /**
+     * Test for multithreaded writer and reader
+     * 
+     */
+    @Test
+    public void testParquetWriteReadAllSchemaTypesParallel() {
+        String fname = output("Rout_parallel");
+
+        ValueType[] schema = new ValueType[] {
+            ValueType.FP64,
+            ValueType.FP32,
+            ValueType.INT32,
+            ValueType.INT64,
+            ValueType.BOOLEAN,
+            ValueType.STRING
+        };
+
+        FrameBlock fb = new FrameBlock(schema);
+
+        Object[][] rows = new Object[][] {
+            { 1.0,    1.1f, 10,  100L,  true,  "A" },
+            { 2.0,    2.1f, 20,  200L,  false, "B" },
+            { 3.0,    3.1f, 30,  300L,  true,  "C" },
+            { 4.0,    4.1f, 40,  400L,  false, "D" },
+            { 5.0,    5.1f, 50,  500L,  true,  "E" }
+        };
+
+        for (Object[] row : rows) {
+            fb.appendRow(row);
+        }
+
+        int numRows = fb.getNumRows();
+        int numCols = fb.getNumColumns();
+
+        try {
+            FrameWriter writer = new FrameWriterParquetParallel();
+            writer.writeFrameToHDFS(fb, fname, numRows, numCols);
+        } 
+        catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail("Failed to write frame block to Parquet (parallel): " 
+ e.getMessage());
+        }
+    
+        FrameBlock fbRead = null;
+        try {
+            FrameReader reader = new FrameReaderParquetParallel();
+            String[] colNames = fb.getColumnNames();
+            fbRead = reader.readFrameFromHDFS(fname, schema, colNames, 
numRows, numCols);
+        } 
+        catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail("Failed to read frame block from Parquet (parallel): " 
+ e.getMessage());
+        }
+
+        compareFrameBlocks(fb, fbRead, 1e-6);
+    }
+
+    private void compareFrameBlocks(FrameBlock expected, FrameBlock actual, 
double eps) {
+        Assert.assertEquals("Number of rows mismatch", expected.getNumRows(), 
actual.getNumRows());
+        Assert.assertEquals("Number of columns mismatch", 
expected.getNumColumns(), actual.getNumColumns());
+
+        int rows = expected.getNumRows();
+        int cols = expected.getNumColumns();
+
+        for (int i = 0; i < rows; i++) {
+            for (int j = 0; j < cols; j++) {
+                Object expVal = expected.get(i, j);
+                Object actVal = actual.get(i, j);
+                ValueType vt = expected.getSchema()[j];
+
+                // Handle nulls first
+                if(expVal == null || actVal == null) {
+                    Assert.assertEquals("Mismatch at (" + i + "," + j + ")", 
expVal, actVal);
+                } else {
+                    switch(vt) {
+                        case FP64:
+                        case FP32:
+                            double dExp = ((Number) expVal).doubleValue();
+                            double dAct = ((Number) actVal).doubleValue();
+                            Assert.assertEquals("Mismatch at (" + i + "," + j 
+ ")", dExp, dAct, eps);
+                            break;
+                        case INT32:
+                        case INT64:
+                            long lExp = ((Number) expVal).longValue();
+                            long lAct = ((Number) actVal).longValue();
+                            Assert.assertEquals("Mismatch at (" + i + "," + j 
+ ")", lExp, lAct);
+                            break;
+                        case BOOLEAN:
+                            boolean bExp = (Boolean) expVal;
+                            boolean bAct = (Boolean) actVal;
+                            Assert.assertEquals("Mismatch at (" + i + "," + j 
+ ")", bExp, bAct);
+                            break;
+                        case STRING:
+                            Assert.assertEquals("Mismatch at (" + i + "," + j 
+ ")", expVal.toString(), actVal.toString());
+                            break;
+                        default:
+                            Assert.fail("Unsupported type in comparison: " + 
vt);
+                    }
+                }
+            }
+        }
+    }
+}


Reply via email to