This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new d19f505c84 [SYSTEMDS-2229] Extended I/O Framework: Readers/Writers for Parquet d19f505c84 is described below commit d19f505c844dfeb0dc5933653a06deb29bc61bfd Author: sayedkeika <adnan.elge...@campus.tu-berlin.de> AuthorDate: Fri Apr 18 14:59:26 2025 +0200 [SYSTEMDS-2229] Extended I/O Framework: Readers/Writers for Parquet Closes #2229. --- src/main/java/org/apache/sysds/common/Types.java | 1 + .../sysds/runtime/io/FrameReaderParquet.java | 157 +++++++++++++++ .../runtime/io/FrameReaderParquetParallel.java | 118 +++++++++++ .../sysds/runtime/io/FrameWriterParquet.java | 199 ++++++++++++++++++ .../runtime/io/FrameWriterParquetParallel.java | 120 +++++++++++ .../sysds/test/functions/io/SeqParReadTest2.java | 10 + .../io/parquet/FrameParquetSchemaTest.java | 223 +++++++++++++++++++++ 7 files changed, 828 insertions(+) diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java index c9820a2c09..e6d5f8e945 100644 --- a/src/main/java/org/apache/sysds/common/Types.java +++ b/src/main/java/org/apache/sysds/common/Types.java @@ -868,6 +868,7 @@ public interface Types { PROTO, // protocol buffer representation HDF5, // Hierarchical Data Format (HDF) COG, // Cloud-optimized GeoTIFF + PARQUET, // parquet format for columnar data storage UNKNOWN; public boolean isIJV() { diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java new file mode 100644 index 0000000000..ff23e9ea31 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.util.HDFSTool; + +/** + * Single-threaded frame parquet reader. + * + */ +public class FrameReaderParquet extends FrameReader { + + /** + * Reads a Parquet file from HDFS and converts it into a FrameBlock. + * + * @param fname The HDFS file path to the Parquet file. + * @param schema The expected data types of the columns. + * @param names The names of the columns. + * @param rlen The expected number of rows. + * @param clen The expected number of columns. + * @return A FrameBlock containing the data read from the Parquet file. + */ + @Override + public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException { + // Prepare file access + Configuration conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + + // Check existence and non-empty file + if (!HDFSTool.existsFileOnHDFS(path.toString())) { + throw new IOException("File does not exist on HDFS: " + fname); + } + + // Allocate output frame block + ValueType[] lschema = createOutputSchema(schema, clen); + String[] lnames = createOutputNames(names, clen); + FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen); + + // Read Parquet file + readParquetFrameFromHDFS(path, conf, ret, lschema, rlen, clen); + + return ret; + } + + /** + * Reads data from a Parquet file on HDFS and fills the provided FrameBlock. + * The method retrieves the Parquet schema from the file footer, maps the required column names + * to their corresponding indices, and then uses a ParquetReader to iterate over each row. + * Data is extracted based on the column type and set into the output FrameBlock. + * + * @param path The HDFS path to the Parquet file. + * @param conf The Hadoop configuration. + * @param dest The FrameBlock to populate with data. + * @param schema The expected value types for the output columns. + * @param rlen The expected number of rows. + * @param clen The expected number of columns. + */ + protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException { + // Retrieve schema from Parquet footer + ParquetMetadata metadata = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)).getFooter(); + MessageType parquetSchema = metadata.getFileMetaData().getSchema(); + + // Map column names to Parquet schema indices + String[] columnNames = dest.getColumnNames(); + int[] columnIndices = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + columnIndices[i] = parquetSchema.getFieldIndex(columnNames[i]); + } + + // Read data usind ParquetReader + try (ParquetReader<Group> rowReader = ParquetReader.builder(new GroupReadSupport(), path) + .withConf(conf) + .build()) { + + Group group; + int row = 0; + while ((group = rowReader.read()) != null) { + for (int col = 0; col < clen; col++) { + int colIndex = columnIndices[col]; + if (group.getFieldRepetitionCount(colIndex) > 0) { + PrimitiveType.PrimitiveTypeName type = parquetSchema.getType(columnNames[col]).asPrimitiveType().getPrimitiveTypeName(); + switch (type) { + case INT32: + dest.set(row, col, group.getInteger(colIndex, 0)); + break; + case INT64: + dest.set(row, col, group.getLong(colIndex, 0)); + break; + case FLOAT: + dest.set(row, col, group.getFloat(colIndex, 0)); + break; + case DOUBLE: + dest.set(row, col, group.getDouble(colIndex, 0)); + break; + case BOOLEAN: + dest.set(row, col, group.getBoolean(colIndex, 0)); + break; + case BINARY: + dest.set(row, col, group.getBinary(colIndex, 0).toStringUsingUTF8()); + break; + default: + throw new IOException("Unsupported data type: " + type); + } + } else { + dest.set(row, col, null); + } + } + row++; + } + + // Check frame dimensions + if (row != rlen) { + throw new IOException("Mismatch in row count: expected " + rlen + ", but got " + row); + } + } + } + + //not implemented + @Override + public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException { + throw new UnsupportedOperationException("Unimplemented method 'readFrameFromInputStream'"); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java new file mode 100644 index 0000000000..3d40f53c62 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.util.CommonThreadPool; + +/** + * Multi-threaded frame parquet reader. + * + */ +public class FrameReaderParquetParallel extends FrameReaderParquet { + + /** + * Reads a Parquet frame in parallel and populates the provided FrameBlock with the data. + * The method retrieves all file paths from the sequence files at that location, it then determines + * the number of threads to use based on the available files and a configured parallelism setting. + * A thread pool is created to run a reading task for each file concurrently. + * + * @param path The HDFS path to the Parquet file or the directory containing sequence files. + * @param conf The Hadoop configuration. + * @param dest The FrameBlock to be updated with the data read from the files. + * @param schema The expected value types for the frame columns. + * @param rlen The expected number of rows. + * @param clen The expected number of columns. + */ + @Override + protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException, DMLRuntimeException { + FileSystem fs = IOUtilFunctions.getFileSystem(path); + Path[] files = IOUtilFunctions.getSequenceFilePaths(fs, path); + int numThreads = Math.min(OptimizerUtils.getParallelBinaryReadParallelism(), files.length); + + // Create and execute read tasks + ExecutorService pool = CommonThreadPool.get(numThreads); + try { + List<ReadFileTask> tasks = new ArrayList<>(); + for (Path file : files) { + tasks.add(new ReadFileTask(file, conf, dest, schema, clen)); + } + + for (Future<Object> task : pool.invokeAll(tasks)) { + task.get(); + } + } catch (Exception e) { + throw new IOException("Failed parallel read of Parquet frame.", e); + } finally { + pool.shutdown(); + } + } + + private class ReadFileTask implements Callable<Object> { + private Path path; + private Configuration conf; + private FrameBlock dest; + @SuppressWarnings("unused") + private ValueType[] schema; + private long clen; + + public ReadFileTask(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long clen) { + this.path = path; + this.conf = conf; + this.dest = dest; + this.schema = schema; + this.clen = clen; + } + + // When executed, a ParquetReader for the assigned file opens and iterates over each row processing every column. + @Override + public Object call() throws Exception { + try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) { + Group group; + int row = 0; + while ((group = reader.read()) != null) { + for (int col = 0; col < clen; col++) { + if (group.getFieldRepetitionCount(col) > 0) { + dest.set(row, col, group.getValueToString(col, 0)); + } else { + dest.set(row, col, null); + } + } + row++; + } + } + return null; + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java new file mode 100644 index 0000000000..ccaeeb56d5 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.common.Types.ValueType; + +/** + * Single-threaded frame parquet writer. + * + */ +public class FrameWriterParquet extends FrameWriter { + + /** + * Writes a FrameBlock to a Parquet file on HDFS. + * + * @param src The FrameBlock containing the data to write. + * @param fname The HDFS file path where the Parquet file will be stored. + * @param rlen The expected number of rows. + * @param clen The expected number of columns. + */ + @Override + public final void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long clen) throws IOException, DMLRuntimeException { + // Prepare file access + JobConf conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + + // If the file already exists on HDFS, remove it + HDFSTool.deleteFileIfExistOnHDFS(path, conf); + + // Check frame dimensions + if (src.getNumRows() != rlen || src.getNumColumns() != clen) { + throw new IOException("Frame dimensions mismatch with metadata: " + src.getNumRows() + "x" + src.getNumColumns() + " vs " + rlen + "x" + clen + "."); + } + + // Write parquet file + writeParquetFrameToHDFS(path, conf, src); + } + + /** + * Writes the FrameBlock data to a Parquet file using a ParquetWriter. + * The method generates a Parquet schema based on the metadata of the FrameBlock, initializes a ParquetWriter with specified configurations, + * iterates over each row and column, adding values (in batches for improved performance) using type-specific conversions. + * + * @param path The HDFS path where the Parquet file will be written. + * @param conf The Hadoop configuration. + * @param src The FrameBlock containing the data to write. + */ + protected void writeParquetFrameToHDFS(Path path, Configuration conf, FrameBlock src) + throws IOException + { + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + + // Create schema based on frame block metadata + MessageType schema = createParquetSchema(src); + + // TODO:Experiment with different batch sizes? + int batchSize = 1000; + int rowCount = 0; + + // Write data using ParquetWriter //FIXME replace example writer? + try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withType(schema) + .withCompressionCodec(ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME) + .withRowGroupSize((long) ParquetWriter.DEFAULT_BLOCK_SIZE) + .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) + .withDictionaryEncoding(true) + .build()) + { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + List<Group> rowBuffer = new ArrayList<>(batchSize); + + for (int i = 0; i < src.getNumRows(); i++) { + Group group = groupFactory.newGroup(); + for (int j = 0; j < src.getNumColumns(); j++) { + Object value = src.get(i, j); + if (value != null) { + ValueType type = src.getSchema()[j]; + switch (type) { + case STRING: + group.add(src.getColumnNames()[j], value.toString()); + break; + case INT32: + group.add(src.getColumnNames()[j], (int) value); + break; + case INT64: + group.add(src.getColumnNames()[j], (long) value); + break; + case FP32: + group.add(src.getColumnNames()[j], (float) value); + break; + case FP64: + group.add(src.getColumnNames()[j], (double) value); + break; + case BOOLEAN: + group.add(src.getColumnNames()[j], (boolean) value); + break; + default: + throw new IOException("Unsupported value type: " + type); + } + } + } + rowBuffer.add(group); + rowCount++; + + if (rowCount >= batchSize) { + for (Group g : rowBuffer) { + writer.write(g); + } + rowBuffer.clear(); + rowCount = 0; + } + } + + for (Group g : rowBuffer) { + writer.write(g); + } + } + + // Delete CRC files created by Hadoop if necessary + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); + } + + /** + * Creates a Parquet schema based on the metadata of a FrameBlock. + * + * @param src The FrameBlock whose metadata is used to create the Parquet schema. + * @return The generated Parquet MessageType schema. + */ + protected MessageType createParquetSchema(FrameBlock src) { + StringBuilder schemaBuilder = new StringBuilder("message FrameSchema {"); + String[] columnNames = src.getColumnNames(); + ValueType[] columnTypes = src.getSchema(); + + for (int i = 0; i < src.getNumColumns(); i++) { + schemaBuilder.append("optional "); + switch (columnTypes[i]) { + case STRING: + schemaBuilder.append("binary ").append(columnNames[i]).append(" (UTF8);"); + break; + case INT32: + schemaBuilder.append("int32 ").append(columnNames[i]).append(";"); + break; + case INT64: + schemaBuilder.append("int64 ").append(columnNames[i]).append(";"); + break; + case FP32: + schemaBuilder.append("float ").append(columnNames[i]).append(";"); + break; + case FP64: + schemaBuilder.append("double ").append(columnNames[i]).append(";"); + break; + case BOOLEAN: + schemaBuilder.append("boolean ").append(columnNames[i]).append(";"); + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + columnTypes[i]); + } + } + schemaBuilder.append("}"); + return MessageTypeParser.parseMessageType(schemaBuilder.toString()); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java new file mode 100644 index 0000000000..0ef4431ef4 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.util.CommonThreadPool; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.utils.stats.InfrastructureAnalyzer; + +/** + * Multi-threaded frame parquet reader. + * + */ +public class FrameWriterParquetParallel extends FrameWriterParquet { + + /** + * Writes the FrameBlock data to HDFS in parallel. + * The method estimates the number of output partitions by comparing the total number of cells in the FrameBlock with the + * HDFS block size. It then determines the number of threads to use based on the parallelism configuration and the + * number of partitions. In case of parallelism, it divides the FrameBlock into chunks and a thread pool is created to + * execute a write task for each partition concurrently. + * + * @param path The HDFS path where the Parquet files will be written. + * @param conf The Hadoop configuration. + * @param src The FrameBlock containing the data to write. + */ + @Override + protected void writeParquetFrameToHDFS(Path path, Configuration conf, FrameBlock src) + throws IOException, DMLRuntimeException + { + // Estimate number of output partitions + int numPartFiles = Math.max((int) (src.getNumRows() * src.getNumColumns() / InfrastructureAnalyzer.getHDFSBlockSize()), 1); + + // Determine parallelism + int numThreads = Math.min(OptimizerUtils.getParallelBinaryWriteParallelism(), numPartFiles); + + // Fall back to sequential write if numThreads <= 1 + if (numThreads <= 1) { + super.writeParquetFrameToHDFS(path, conf, src); + return; + } + + // Create directory for concurrent tasks + HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + + // Create and execute write tasks + ExecutorService pool = CommonThreadPool.get(numThreads); + try { + List<WriteFileTask> tasks = new ArrayList<>(); + int chunkSize = (int) Math.ceil((double) src.getNumRows() / numThreads); + + for (int i = 0; i < numThreads; i++) { + int startRow = i * chunkSize; + int endRow = Math.min((i + 1) * chunkSize, (int) src.getNumRows()); + if (startRow < endRow) { + Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); + tasks.add(new WriteFileTask(newPath, conf, src.slice(startRow, endRow - 1))); + } + } + + for (Future<Object> task : pool.invokeAll(tasks)) + task.get(); + } catch (Exception e) { + throw new IOException("Failed parallel write of Parquet frame.", e); + } finally { + pool.shutdown(); + } + } + + protected void writeSingleParquetFile(Path path, Configuration conf, FrameBlock src) + throws IOException, DMLRuntimeException + { + super.writeParquetFrameToHDFS(path, conf, src); + } + + private class WriteFileTask implements Callable<Object> { + private Path path; + private Configuration conf; + private FrameBlock src; + + public WriteFileTask(Path path, Configuration conf, FrameBlock src) { + this.path = path; + this.conf = conf; + this.src = src; + } + + @Override + public Object call() throws Exception { + writeSingleParquetFile(path, conf, src); + return null; + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java index 6bdc428a90..a4070a119d 100644 --- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java +++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java @@ -73,6 +73,10 @@ import org.apache.sysds.runtime.io.WriterTextCell; import org.apache.sysds.runtime.io.WriterTextCellParallel; import org.apache.sysds.runtime.io.WriterTextLIBSVM; import org.apache.sysds.runtime.io.WriterTextLIBSVMParallel; +import org.apache.sysds.runtime.io.FrameReaderParquet; +import org.apache.sysds.runtime.io.FrameReaderParquetParallel; +import org.apache.sysds.runtime.io.FrameWriterParquet; +import org.apache.sysds.runtime.io.FrameWriterParquetParallel; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DataConverter; import org.apache.sysds.runtime.util.UtilFunctions; @@ -159,6 +163,10 @@ public class SeqParReadTest2 extends AutomatedTestBase { {true, "libsvm", false, 0.1}, {true, "libsvm", true, 0.7}, {true, "libsvm", true, 0.1}, + {false, "parquet", false, 0.7}, + {false, "parquet", false, 0.1}, + {false, "parquet", true, 0.7}, + {false, "parquet", true, 0.1}, }; return Arrays.asList(data); } @@ -255,6 +263,7 @@ public class SeqParReadTest2 extends AutomatedTestBase { new FrameWriterTextCSVParallel(new FileFormatPropertiesCSV()) : new FrameWriterTextCSV(new FileFormatPropertiesCSV()); case BINARY: return par ? new FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock(); + case PARQUET: return par ? new FrameWriterParquetParallel() : new FrameWriterParquet(); } return null; } @@ -268,6 +277,7 @@ public class SeqParReadTest2 extends AutomatedTestBase { new FrameReaderTextCSVParallel(new FileFormatPropertiesCSV()) : new FrameReaderTextCSV(new FileFormatPropertiesCSV()); case BINARY: return par ? new FrameReaderBinaryBlockParallel() : new FrameReaderBinaryBlock(); + case PARQUET: return par ? new FrameReaderParquetParallel() : new FrameReaderParquet(); } return null; } diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java new file mode 100644 index 0000000000..dc776c8eab --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.parquet; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReader; +import org.apache.sysds.runtime.io.FrameReaderParquet; +import org.apache.sysds.runtime.io.FrameReaderParquetParallel; +import org.apache.sysds.runtime.io.FrameWriter; +import org.apache.sysds.runtime.io.FrameWriterParquet; +import org.apache.sysds.runtime.io.FrameWriterParquetParallel; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; + +import java.io.IOException; + +/** + * This test class verifies that a FrameBlock with different data types is correctly written and read from Parquet files. + * It tests both sequential and parallel implementations. In these tests a FrameBlock is created, populated with sample + * data, written to a Parquet file, and then read back into a new FrameBlock. The test compares the original and read + * data to ensure that schema information is preserved and that data conversion is performed correctly. + */ +public class FrameParquetSchemaTest extends AutomatedTestBase { + + private final static String TEST_NAME = "FrameParquetSchemaTest"; + private final static String TEST_DIR = "functions/io/parquet"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameParquetSchemaTest.class.getSimpleName() + "/"; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"Rout"})); + } + + + /** + * Test for sequential writer and reader + * + */ + @Test + public void testParquetWriteReadAllSchemaTypes() { + String fname = output("Rout"); + + // Define a schema with one column per type + ValueType[] schema = new ValueType[] { + ValueType.FP64, + ValueType.FP32, + ValueType.INT32, + ValueType.INT64, + ValueType.BOOLEAN, + ValueType.STRING + }; + + // Create an empty frame block with the above schema + FrameBlock fb = new FrameBlock(schema); + + // Populate frame block + Object[][] rows = new Object[][] { + { 1.0, 1.1f, 10, 100L, true, "A" }, + { 2.0, 2.1f, 20, 200L, false, "B" }, + { 3.0, 3.1f, 30, 300L, true, "C" }, + { 4.0, 4.1f, 40, 400L, false, "D" }, + { 5.0, 5.1f, 50, 500L, true, "E" } + }; + + for (Object[] row : rows) { + fb.appendRow(row); + } + + System.out.println(fb); + + int numRows = fb.getNumRows(); + int numCols = fb.getNumColumns(); + + // Write the FrameBlock to a Parquet file using the sequential writer + try { + FrameWriter writer = new FrameWriterParquet(); + writer.writeFrameToHDFS(fb, fname, numRows, numCols); + } + catch (IOException e) { + e.printStackTrace(); + Assert.fail("Failed to write frame block to Parquet: " + e.getMessage()); + } + + // Read the Parquet file back into a new FrameBlock + FrameBlock fbRead = null; + try { + FrameReader reader = new FrameReaderParquet(); + String[] colNames = fb.getColumnNames(); + fbRead = reader.readFrameFromHDFS(fname, schema, colNames, numRows, numCols); + } + catch (IOException e) { + e.printStackTrace(); + Assert.fail("Failed to read frame block from Parquet: " + e.getMessage()); + } + + // Compare the original and the read frame blocks + compareFrameBlocks(fb, fbRead, 1e-6); + } + + /** + * Test for multithreaded writer and reader + * + */ + @Test + public void testParquetWriteReadAllSchemaTypesParallel() { + String fname = output("Rout_parallel"); + + ValueType[] schema = new ValueType[] { + ValueType.FP64, + ValueType.FP32, + ValueType.INT32, + ValueType.INT64, + ValueType.BOOLEAN, + ValueType.STRING + }; + + FrameBlock fb = new FrameBlock(schema); + + Object[][] rows = new Object[][] { + { 1.0, 1.1f, 10, 100L, true, "A" }, + { 2.0, 2.1f, 20, 200L, false, "B" }, + { 3.0, 3.1f, 30, 300L, true, "C" }, + { 4.0, 4.1f, 40, 400L, false, "D" }, + { 5.0, 5.1f, 50, 500L, true, "E" } + }; + + for (Object[] row : rows) { + fb.appendRow(row); + } + + int numRows = fb.getNumRows(); + int numCols = fb.getNumColumns(); + + try { + FrameWriter writer = new FrameWriterParquetParallel(); + writer.writeFrameToHDFS(fb, fname, numRows, numCols); + } + catch (IOException e) { + e.printStackTrace(); + Assert.fail("Failed to write frame block to Parquet (parallel): " + e.getMessage()); + } + + FrameBlock fbRead = null; + try { + FrameReader reader = new FrameReaderParquetParallel(); + String[] colNames = fb.getColumnNames(); + fbRead = reader.readFrameFromHDFS(fname, schema, colNames, numRows, numCols); + } + catch (IOException e) { + e.printStackTrace(); + Assert.fail("Failed to read frame block from Parquet (parallel): " + e.getMessage()); + } + + compareFrameBlocks(fb, fbRead, 1e-6); + } + + private void compareFrameBlocks(FrameBlock expected, FrameBlock actual, double eps) { + Assert.assertEquals("Number of rows mismatch", expected.getNumRows(), actual.getNumRows()); + Assert.assertEquals("Number of columns mismatch", expected.getNumColumns(), actual.getNumColumns()); + + int rows = expected.getNumRows(); + int cols = expected.getNumColumns(); + + for (int i = 0; i < rows; i++) { + for (int j = 0; j < cols; j++) { + Object expVal = expected.get(i, j); + Object actVal = actual.get(i, j); + ValueType vt = expected.getSchema()[j]; + + // Handle nulls first + if(expVal == null || actVal == null) { + Assert.assertEquals("Mismatch at (" + i + "," + j + ")", expVal, actVal); + } else { + switch(vt) { + case FP64: + case FP32: + double dExp = ((Number) expVal).doubleValue(); + double dAct = ((Number) actVal).doubleValue(); + Assert.assertEquals("Mismatch at (" + i + "," + j + ")", dExp, dAct, eps); + break; + case INT32: + case INT64: + long lExp = ((Number) expVal).longValue(); + long lAct = ((Number) actVal).longValue(); + Assert.assertEquals("Mismatch at (" + i + "," + j + ")", lExp, lAct); + break; + case BOOLEAN: + boolean bExp = (Boolean) expVal; + boolean bAct = (Boolean) actVal; + Assert.assertEquals("Mismatch at (" + i + "," + j + ")", bExp, bAct); + break; + case STRING: + Assert.assertEquals("Mismatch at (" + i + "," + j + ")", expVal.toString(), actVal.toString()); + break; + default: + Assert.fail("Unsupported type in comparison: " + vt); + } + } + } + } + } +}