This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new c55afa064e [core] text formats support compress (#6103) c55afa064e is described below commit c55afa064e9ffcdec40ca4a2c1f316cf70649fc1 Author: jerry <lining....@alibaba-inc.com> AuthorDate: Sat Aug 23 13:18:13 2025 +0800 [core] text formats support compress (#6103) --- .../org/apache/paimon/format/CompressionType.java | 137 +++++++++++++ .../apache/paimon/format/FormatReadWriteTest.java | 10 +- .../org/apache/paimon/io/DataFilePathFactory.java | 30 ++- .../apache/paimon/flink/BatchFileStoreITCase.java | 20 ++ .../apache/paimon/format/BaseTextFileReader.java | 23 ++- .../apache/paimon/format/BaseTextFileWriter.java | 35 +++- .../org/apache/paimon/format/TextCompression.java | 137 +++++++++++++ .../apache/paimon/format/csv/CsvFileFormat.java | 19 +- .../apache/paimon/format/csv/CsvFileReader.java | 23 ++- .../apache/paimon/format/csv/CsvFormatWriter.java | 36 ++-- .../apache/paimon/format/csv/CsvReaderFactory.java | 5 +- .../apache/paimon/format/json/JsonFileFormat.java | 21 +- .../apache/paimon/format/json/JsonFileReader.java | 16 +- .../paimon/format/json/JsonFormatWriter.java | 12 +- .../paimon/format/json/JsonReaderFactory.java | 5 +- .../apache/paimon/format/BaseCompressionTest.java | 221 +++++++++++++++++++++ .../paimon/format/csv/CsvCompressionTest.java | 54 +++++ .../paimon/format/csv/CsvFileFormatTest.java | 11 +- .../paimon/format/json/JsonCompressionTest.java | 64 ++++++ .../paimon/format/json/JsonFileFormatTest.java | 11 +- .../org/apache/paimon/spark/SparkWriteITCase.java | 8 +- 21 files changed, 821 insertions(+), 77 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java b/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java new file mode 100644 index 0000000000..236db62fb7 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.options.description.DescribedEnum; +import org.apache.paimon.options.description.InlineElement; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.paimon.options.description.TextElement.text; + +/** Compression types supported by Paimon file formats. */ +public enum CompressionType implements DescribedEnum { + NONE("none", "No compression.", null, ""), + GZIP( + "gzip", + "GZIP compression using the deflate algorithm.", + "org.apache.hadoop.io.compress.GzipCodec", + "gz"), + BZIP2( + "bzip2", + "BZIP2 compression using the Burrows-Wheeler algorithm.", + "org.apache.hadoop.io.compress.BZip2Codec", + "bz2"), + DEFLATE( + "deflate", + "DEFLATE compression using the deflate algorithm.", + "org.apache.hadoop.io.compress.DeflateCodec", + "deflate"), + SNAPPY( + "snappy", + "Snappy compression for fast compression and decompression.", + "org.apache.hadoop.io.compress.SnappyCodec", + "snappy"), + LZ4( + "lz4", + "LZ4 compression for very fast compression and decompression.", + "org.apache.hadoop.io.compress.Lz4Codec", + "lz4"), + ZSTD( + "zstd", + "Zstandard compression for high compression ratio and speed.", + "org.apache.hadoop.io.compress.ZStandardCodec", + "zst"); + + private final String value; + private final String className; + private final String fileExtension; + private final String description; + + private static final Set<String> SUPPORTED_EXTENSIONS; + + static { + Set<String> extensions = new HashSet<>(); + for (CompressionType type : CompressionType.values()) { + if (type != CompressionType.NONE + && type.fileExtension() != null + && !type.fileExtension().isEmpty()) { + extensions.add(type.fileExtension().toLowerCase()); + } + } + SUPPORTED_EXTENSIONS = Collections.unmodifiableSet(extensions); + } + + CompressionType(String value, String description, String className, String fileExtension) { + this.value = value; + this.description = description; + this.className = className; + this.fileExtension = fileExtension; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + + public String value() { + return value; + } + + public String hadoopCodecClassName() { + return className; + } + + public String fileExtension() { + return fileExtension; + } + + public static CompressionType fromValue(String value) { + if (value == null || value.isEmpty()) { + return NONE; + } + + for (CompressionType type : CompressionType.values()) { + if (type.value.equalsIgnoreCase(value)) { + return type; + } + } + return NONE; + } + + /** + * Check if the given extension is a supported compression extension. + * + * @param extension the file extension to check + * @return true if the extension is a supported compression extension, false otherwise + */ + public static boolean isSupportedExtension(String extension) { + if (extension == null || extension.isEmpty()) { + return false; + } + return SUPPORTED_EXTENSIONS.contains(extension.toLowerCase()); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java index 40dcd975fc..5e1f186c85 100644 --- a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java @@ -140,6 +140,10 @@ public abstract class FormatReadWriteTest { return true; } + public String compression() { + return "zstd"; + } + @Test public void testNestedReadPruning() throws Exception { if (!supportNestedReadPruning()) { @@ -250,10 +254,10 @@ public abstract class FormatReadWriteTest { FormatWriter writer; PositionOutputStream out = null; if (factory instanceof SupportsDirectWrite) { - writer = ((SupportsDirectWrite) factory).create(fileIO, file, "zstd"); + writer = ((SupportsDirectWrite) factory).create(fileIO, file, this.compression()); } else { out = fileIO.newOutputStream(file, false); - writer = factory.create(out, "zstd"); + writer = factory.create(out, this.compression()); } for (InternalRow row : rows) { writer.addElement(row); @@ -381,7 +385,7 @@ public abstract class FormatReadWriteTest { Path filePath = new Path(parent, UUID.randomUUID().toString()); FormatWriterFactory writerFactory = jsonFormat.createWriterFactory(rowType); try (FormatWriter writer = - writerFactory.create(fileIO.newOutputStream(filePath, false), "none")) { + writerFactory.create(fileIO.newOutputStream(filePath, false), compression())) { for (InternalRow row : testData) { writer.addElement(row); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index e8193866db..933825271f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -19,6 +19,7 @@ package org.apache.paimon.io; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.format.CompressionType; import org.apache.paimon.fs.ExternalPathProvider; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.FileEntry; @@ -88,8 +89,14 @@ public class DataFilePathFactory { private String newFileName(String prefix) { String extension; - if (fileSuffixIncludeCompression) { - extension = "." + fileCompression + "." + formatIdentifier; + if (isTextFormat(formatIdentifier)) { + String compressionExtension = + CompressionType.fromValue(fileCompression).fileExtension(); + extension = "." + formatIdentifier + "." + compressionExtension; + } else if (fileSuffixIncludeCompression) { + String compressionExtension = + CompressionType.fromValue(fileCompression).fileExtension(); + extension = "." + compressionExtension + "." + formatIdentifier; } else { extension = "." + formatIdentifier; } @@ -162,7 +169,19 @@ public class DataFilePathFactory { throw new IllegalArgumentException(fileName + " is not a legal file name."); } - return fileName.substring(index + 1); + String extension = fileName.substring(index + 1); + if (CompressionType.isSupportedExtension(extension)) { + int secondLastDot = fileName.lastIndexOf('.', index - 1); + if (secondLastDot != -1) { + String formatIdentifier = fileName.substring(secondLastDot + 1, index); + // If the format is json or csv, return that instead of the compression extension + if (isTextFormat(formatIdentifier)) { + return formatIdentifier; + } + } + } + + return extension; } public boolean isExternalPath() { @@ -173,4 +192,9 @@ public class DataFilePathFactory { String uuid() { return uuid; } + + private static boolean isTextFormat(String formatIdentifier) { + return "json".equalsIgnoreCase(formatIdentifier) + || "csv".equalsIgnoreCase(formatIdentifier); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index f61f1fa1d7..0c24824ef8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -68,6 +68,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase { sql("CREATE TABLE CSV (a INT, b INT, c INT) WITH ('file.format'='csv')"); sql("INSERT INTO CSV VALUES (1, 2, 3)"); assertThat(sql("SELECT * FROM CSV")).containsExactly(Row.of(1, 2, 3)); + + sql( + "CREATE TABLE CSV_GZIP (a INT, b INT, c INT) WITH ('file.format'='csv', 'file.compression'='gzip')"); + sql("INSERT INTO CSV_GZIP VALUES (1, 2, 3)"); + assertThat(sql("SELECT * FROM CSV_GZIP")).containsExactly(Row.of(1, 2, 3)); + List<String> files = + sql("select file_path from `CSV_GZIP$files`").stream() + .map(r -> r.getField(0).toString()) + .collect(Collectors.toList()); + assertThat(files).allMatch(file -> file.endsWith("csv.gz")); } @Test @@ -75,6 +85,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase { sql("CREATE TABLE JSON_T (a INT, b INT, c INT) WITH ('file.format'='json')"); sql("INSERT INTO JSON_T VALUES (1, 2, 3)"); assertThat(sql("SELECT * FROM JSON_T")).containsExactly(Row.of(1, 2, 3)); + + sql( + "CREATE TABLE JSON_GZIP (a INT, b INT, c INT) WITH ('file.format'='json', 'file.compression'='gzip')"); + sql("INSERT INTO JSON_GZIP VALUES (1, 2, 3)"); + assertThat(sql("SELECT * FROM JSON_GZIP")).containsExactly(Row.of(1, 2, 3)); + List<String> files = + sql("select file_path from `JSON_GZIP$files`").stream() + .map(r -> r.getField(0).toString()) + .collect(Collectors.toList()); + assertThat(files).allMatch(file -> file.endsWith("json.gz")); } @Test diff --git a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java index 3cf2ec92cd..033f9bf65d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java @@ -21,6 +21,7 @@ package org.apache.paimon.format; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.types.RowType; @@ -29,6 +30,7 @@ import javax.annotation.Nullable; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; @@ -37,17 +39,21 @@ public abstract class BaseTextFileReader implements FileRecordReader<InternalRow protected final Path filePath; protected final RowType rowType; + protected final InputStream decompressedStream; protected final BufferedReader bufferedReader; protected boolean readerClosed = false; protected BaseTextRecordIterator reader; - protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType rowType) throws IOException { + protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType rowType, Options options) + throws IOException { this.filePath = filePath; this.rowType = rowType; + this.decompressedStream = + TextCompression.createDecompressedInputStream( + fileIO.newInputStream(filePath), filePath, options); this.bufferedReader = new BufferedReader( - new InputStreamReader( - fileIO.newInputStream(filePath), StandardCharsets.UTF_8)); + new InputStreamReader(this.decompressedStream, StandardCharsets.UTF_8)); this.reader = createRecordIterator(); } @@ -89,8 +95,15 @@ public abstract class BaseTextFileReader implements FileRecordReader<InternalRow @Override public void close() throws IOException { - if (!readerClosed && bufferedReader != null) { - bufferedReader.close(); + if (!readerClosed) { + // Close the buffered reader first + if (bufferedReader != null) { + bufferedReader.close(); + } + // Explicitly close the decompressed stream to prevent resource leaks + if (decompressedStream != null) { + decompressedStream.close(); + } readerClosed = true; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java index 6be2123ed3..f6534afd61 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java @@ -20,10 +20,12 @@ package org.apache.paimon.format; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.options.Options; import org.apache.paimon.types.RowType; import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; @@ -34,11 +36,21 @@ public abstract class BaseTextFileWriter implements FormatWriter { protected final BufferedWriter writer; protected final RowType rowType; - protected BaseTextFileWriter(PositionOutputStream outputStream, RowType rowType) { + protected BaseTextFileWriter( + PositionOutputStream outputStream, + RowType rowType, + Options formatOptions, + CompressionType compressionType) + throws IOException { this.outputStream = outputStream; - this.rowType = rowType; + OutputStream compressedStream = + TextCompression.createCompressedOutputStream( + outputStream, compressionType, formatOptions); this.writer = - new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)); + new BufferedWriter( + new OutputStreamWriter(compressedStream, StandardCharsets.UTF_8), + getOptimalBufferSize(compressionType)); + this.rowType = rowType; } /** @@ -61,4 +73,21 @@ public abstract class BaseTextFileWriter implements FormatWriter { } return false; } + + private int getOptimalBufferSize(CompressionType compressionType) { + switch (compressionType) { + case GZIP: + case DEFLATE: + return 65536; // 64KB for deflate-based compression + case SNAPPY: + case LZ4: + return 131072; // 128KB for fast compression + case ZSTD: + return 262144; // 256KB for high compression ratio + case BZIP2: + return 65536; // 64KB for bzip2 + default: + return 65536; // Default 64KB buffer size + } + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java b/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java new file mode 100644 index 0000000000..4994b2448a --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.HadoopUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Optional; + +/** Utility class for handling text file compression and decompression using Hadoop codecs. */ +public class TextCompression { + + private static final Logger LOG = LoggerFactory.getLogger(TextCompression.class); + /** + * Creates a compressed output stream using Hadoop's compression codecs. + * + * @param out The underlying output stream + * @param compression The compression format + * @param options Paimon options for Hadoop configuration + * @return Compressed output stream + * @throws IOException If compression stream creation fails + */ + public static OutputStream createCompressedOutputStream( + PositionOutputStream out, CompressionType compression, Options options) + throws IOException { + Optional<CompressionCodec> codecOpt = + getCompressionCodecByCompression(compression, options); + if (codecOpt.isPresent()) { + return codecOpt.get().createOutputStream(out); + } + return out; + } + + /** + * Creates a decompressed input stream using Hadoop's compression codecs. + * + * @param inputStream The underlying input stream + * @param filePath The file path (used to detect compression from extension) + * @param options Paimon options for Hadoop configuration + * @return Decompressed input stream + */ + public static InputStream createDecompressedInputStream( + SeekableInputStream inputStream, Path filePath, Options options) throws IOException { + try { + Configuration conf = HadoopUtils.getHadoopConfiguration(options); + CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); + + Optional<CompressionCodec> codecOpt = + Optional.ofNullable( + codecFactory.getCodec( + new org.apache.hadoop.fs.Path(filePath.toString()))); + if (codecOpt.isPresent()) { + return codecOpt.get().createInputStream(inputStream); + } + return inputStream; + } catch (Throwable e) { + throw new IOException("Failed to create decompression stream", e); + } + } + + public static CompressionType getTextCompressionType(String compression, Options options) { + CompressionType compressionType = CompressionType.fromValue(compression); + Optional<CompressionCodec> codecOpt = + getCompressionCodecByCompression(compressionType, options); + if (codecOpt.isPresent()) { + return CompressionType.fromValue(compression); + } + return CompressionType.NONE; + } + + /** + * Gets a compression codec by compression type. + * + * @param compressionType The compression type + * @param options Paimon options for Hadoop configuration + * @return Optional CompressionCodec instance + */ + public static Optional<CompressionCodec> getCompressionCodecByCompression( + CompressionType compressionType, Options options) { + if (compressionType == null || CompressionType.NONE == compressionType) { + return Optional.empty(); + } + + try { + Configuration conf = HadoopUtils.getHadoopConfiguration(options); + String codecName = compressionType.hadoopCodecClassName(); + if (codecName != null) { + Class<?> codecClass = Class.forName(codecName); + if (CompressionCodec.class.isAssignableFrom(codecClass)) { + CompressionCodec codec = + (CompressionCodec) codecClass.getDeclaredConstructor().newInstance(); + if (codec instanceof org.apache.hadoop.conf.Configurable) { + ((org.apache.hadoop.conf.Configurable) codec).setConf(conf); + } + // Test if the codec is actually usable by creating a test stream + try { + codec.createOutputStream(new java.io.ByteArrayOutputStream()); + return Optional.of(codec); + } catch (Throwable e) { + LOG.warn("Failed to create compression, so use none", e); + } + } + } + } catch (Throwable e) { + LOG.warn("Failed to create compression, so use none", e); + } + return Optional.empty(); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java index 6dce1b470a..59acbdffa6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java @@ -19,11 +19,13 @@ package org.apache.paimon.format.csv; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.CompressionType; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.FormatWriter; import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.TextCompression; import org.apache.paimon.fs.CloseShieldOutputStream; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.Options; @@ -34,6 +36,7 @@ import org.apache.paimon.types.RowType; import javax.annotation.Nullable; +import java.io.IOException; import java.util.List; /** CSV {@link FileFormat}. */ @@ -55,12 +58,12 @@ public class CsvFileFormat extends FileFormat { @Override public FormatReaderFactory createReaderFactory( RowType projectedRowType, @Nullable List<Predicate> filters) { - return new CsvReaderFactory(projectedRowType, new CsvOptions(options)); + return new CsvReaderFactory(projectedRowType, options); } @Override public FormatWriterFactory createWriterFactory(RowType type) { - return new CsvWriterFactory(type, new CsvOptions(options)); + return new CsvWriterFactory(type, options); } @Override @@ -103,16 +106,20 @@ public class CsvFileFormat extends FileFormat { private static class CsvWriterFactory implements FormatWriterFactory { private final RowType rowType; - private final CsvOptions options; + private final Options options; - public CsvWriterFactory(RowType rowType, CsvOptions options) { + public CsvWriterFactory(RowType rowType, Options options) { this.rowType = rowType; this.options = options; } @Override - public FormatWriter create(PositionOutputStream out, String compression) { - return new CsvFormatWriter(new CloseShieldOutputStream(out), rowType, options); + public FormatWriter create(PositionOutputStream out, String compression) + throws IOException { + CompressionType compressionType = + TextCompression.getTextCompressionType(compression, options); + return new CsvFormatWriter( + new CloseShieldOutputStream(out), rowType, options, compressionType); } } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java index c85ab8208e..dbb174528a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.BaseTextFileReader; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DataTypes; @@ -49,20 +50,20 @@ public class CsvFileReader extends BaseTextFileReader { private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE = new ConcurrentHashMap<>(32); - private final CsvOptions options; + private final CsvOptions formatOptions; private final CsvSchema schema; private boolean headerSkipped = false; - public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, CsvOptions options) + public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, Options options) throws IOException { - super(fileIO, filePath, rowType); - this.options = options; + super(fileIO, filePath, rowType, options); + this.formatOptions = new CsvOptions(options); this.schema = CsvSchema.emptySchema() - .withQuoteChar(options.quoteCharacter().charAt(0)) - .withColumnSeparator(options.fieldDelimiter().charAt(0)) - .withEscapeChar(options.escapeCharacter().charAt(0)); - if (!options.includeHeader()) { + .withQuoteChar(formatOptions.quoteCharacter().charAt(0)) + .withColumnSeparator(formatOptions.fieldDelimiter().charAt(0)) + .withEscapeChar(formatOptions.escapeCharacter().charAt(0)); + if (!formatOptions.includeHeader()) { this.schema.withoutHeader(); } } @@ -80,7 +81,7 @@ public class CsvFileReader extends BaseTextFileReader { @Override protected void setupReading() throws IOException { // Skip header if needed - if (options.includeHeader() && !headerSkipped) { + if (formatOptions.includeHeader() && !headerSkipped) { bufferedReader.readLine(); headerSkipped = true; } @@ -108,7 +109,7 @@ public class CsvFileReader extends BaseTextFileReader { String field = fields[i]; // Fast path for null values - if (field == null || field.equals(options.nullLiteral()) || field.isEmpty()) { + if (field == null || field.equals(formatOptions.nullLiteral()) || field.isEmpty()) { values[i] = null; continue; } @@ -122,7 +123,7 @@ public class CsvFileReader extends BaseTextFileReader { /** Optimized field parsing with caching and fast paths for common types. */ private Object parseFieldOptimized(String field, DataType dataType) { - if (field == null || field.equals(options.nullLiteral())) { + if (field == null || field.equals(formatOptions.nullLiteral())) { return null; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java index 754bb5a192..87d1abb5d2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java @@ -22,7 +22,9 @@ import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.BaseTextFileWriter; +import org.apache.paimon.format.CompressionType; import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; @@ -40,20 +42,22 @@ public class CsvFormatWriter extends BaseTextFileWriter { private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE = new ConcurrentHashMap<>(32); - private final CsvOptions options; + private final CsvOptions csvOptions; private boolean headerWritten = false; private final StringBuilder stringBuilder; - public CsvFormatWriter(PositionOutputStream out, RowType rowType, CsvOptions options) { - super(out, rowType); - this.options = options; + public CsvFormatWriter( + PositionOutputStream out, RowType rowType, Options options, CompressionType compression) + throws IOException { + super(out, rowType, options, compression); + this.csvOptions = new CsvOptions(options); this.stringBuilder = new StringBuilder(); } @Override public void addElement(InternalRow element) throws IOException { // Write header if needed - if (options.includeHeader() && !headerWritten) { + if (csvOptions.includeHeader() && !headerWritten) { writeHeader(); headerWritten = true; } @@ -64,7 +68,7 @@ public class CsvFormatWriter extends BaseTextFileWriter { int fieldCount = rowType.getFieldCount(); for (int i = 0; i < fieldCount; i++) { if (i > 0) { - stringBuilder.append(options.fieldDelimiter()); + stringBuilder.append(csvOptions.fieldDelimiter()); } Object value = @@ -72,7 +76,7 @@ public class CsvFormatWriter extends BaseTextFileWriter { String fieldValue = escapeField(castToStringOptimized(value, rowType.getTypeAt(i))); stringBuilder.append(fieldValue); } - stringBuilder.append(options.lineDelimiter()); + stringBuilder.append(csvOptions.lineDelimiter()); writer.write(stringBuilder.toString()); } @@ -83,24 +87,24 @@ public class CsvFormatWriter extends BaseTextFileWriter { int fieldCount = rowType.getFieldCount(); for (int i = 0; i < fieldCount; i++) { if (i > 0) { - stringBuilder.append(options.fieldDelimiter()); + stringBuilder.append(csvOptions.fieldDelimiter()); } stringBuilder.append(escapeField(rowType.getFieldNames().get(i))); } - stringBuilder.append(options.lineDelimiter()); + stringBuilder.append(csvOptions.lineDelimiter()); writer.write(stringBuilder.toString()); } private String escapeField(String field) { if (field == null) { - return options.nullLiteral(); + return csvOptions.nullLiteral(); } // Optimized escaping with early exit checks boolean needsQuoting = - field.indexOf(options.fieldDelimiter().charAt(0)) >= 0 - || field.indexOf(options.lineDelimiter().charAt(0)) >= 0 - || field.indexOf(options.quoteCharacter().charAt(0)) >= 0; + field.indexOf(csvOptions.fieldDelimiter().charAt(0)) >= 0 + || field.indexOf(csvOptions.lineDelimiter().charAt(0)) >= 0 + || field.indexOf(csvOptions.quoteCharacter().charAt(0)) >= 0; if (!needsQuoting) { return field; @@ -109,9 +113,9 @@ public class CsvFormatWriter extends BaseTextFileWriter { // Only escape if needed String escaped = field.replace( - options.quoteCharacter(), - options.escapeCharacter() + options.quoteCharacter()); - return options.quoteCharacter() + escaped + options.quoteCharacter(); + csvOptions.quoteCharacter(), + csvOptions.escapeCharacter() + csvOptions.quoteCharacter()); + return csvOptions.quoteCharacter() + escaped + csvOptions.quoteCharacter(); } /** Optimized string casting with caching and fast paths for common types. */ diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java index cb26e3faf8..23fcf09fd6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java @@ -20,6 +20,7 @@ package org.apache.paimon.format.csv; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.options.Options; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.types.RowType; @@ -29,9 +30,9 @@ import java.io.IOException; public class CsvReaderFactory implements FormatReaderFactory { private final RowType rowType; - private final CsvOptions options; + private final Options options; - public CsvReaderFactory(RowType rowType, CsvOptions options) { + public CsvReaderFactory(RowType rowType, Options options) { this.rowType = rowType; this.options = options; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java index 49790fefc2..54ccf59602 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java @@ -19,11 +19,13 @@ package org.apache.paimon.format.json; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.CompressionType; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.FormatWriter; import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.TextCompression; import org.apache.paimon.fs.CloseShieldOutputStream; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.Options; @@ -34,6 +36,7 @@ import org.apache.paimon.types.RowType; import javax.annotation.Nullable; +import java.io.IOException; import java.util.List; /** JSON {@link FileFormat}. */ @@ -45,18 +48,18 @@ public class JsonFileFormat extends FileFormat { public JsonFileFormat(FormatContext context) { super(IDENTIFIER); - this.options = getIdentifierPrefixOptions(context.options()); + this.options = context.options(); } @Override public FormatReaderFactory createReaderFactory( RowType projectedRowType, @Nullable List<Predicate> filters) { - return new JsonReaderFactory(projectedRowType, new JsonOptions(options)); + return new JsonReaderFactory(projectedRowType, options); } @Override public FormatWriterFactory createWriterFactory(RowType type) { - return new JsonWriterFactory(type, new JsonOptions(options)); + return new JsonWriterFactory(type, options); } @Override @@ -102,16 +105,20 @@ public class JsonFileFormat extends FileFormat { private static class JsonWriterFactory implements FormatWriterFactory { private final RowType rowType; - private final JsonOptions options; + private final Options options; - public JsonWriterFactory(RowType rowType, JsonOptions options) { + public JsonWriterFactory(RowType rowType, Options options) { this.rowType = rowType; this.options = options; } @Override - public FormatWriter create(PositionOutputStream out, String compression) { - return new JsonFormatWriter(new CloseShieldOutputStream(out), rowType, options); + public FormatWriter create(PositionOutputStream out, String compression) + throws IOException { + CompressionType compressionType = + TextCompression.getTextCompressionType(compression, options); + return new JsonFormatWriter( + new CloseShieldOutputStream(out), rowType, options, compressionType); } } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java index 96d5ecfa5d..b46f92835e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java @@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.BaseTextFileReader; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -53,10 +54,10 @@ public class JsonFileReader extends BaseTextFileReader { private final JsonOptions options; - public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType, JsonOptions options) + public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType, Options options) throws IOException { - super(fileIO, filePath, rowType); - this.options = options; + super(fileIO, filePath, rowType, options); + this.options = new JsonOptions(options); } @Override @@ -67,7 +68,8 @@ public class JsonFileReader extends BaseTextFileReader { @Override protected InternalRow parseLine(String line) throws IOException { try { - return convertJsonStringToRow(line, rowType, options); + JsonNode jsonNode = JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(line); + return (InternalRow) convertJsonValue(jsonNode, rowType, options); } catch (JsonProcessingException e) { if (options.ignoreParseErrors()) { return null; @@ -88,12 +90,6 @@ public class JsonFileReader extends BaseTextFileReader { // No additional JSON-specific iterator logic needed } - private InternalRow convertJsonStringToRow(String line, RowType rowType, JsonOptions options) - throws JsonProcessingException { - JsonNode jsonNode = JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(line); - return (InternalRow) convertJsonValue(jsonNode, rowType, options); - } - private Object convertJsonValue(JsonNode node, DataType dataType, JsonOptions options) { if (node == null || node.isNull()) { return null; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java index de6b1a70f9..414a7eb0fd 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java @@ -25,7 +25,9 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.BaseTextFileWriter; +import org.apache.paimon.format.CompressionType; import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.options.Options; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -52,9 +54,13 @@ public class JsonFormatWriter extends BaseTextFileWriter { private final char lineDelimiter; public JsonFormatWriter( - PositionOutputStream outputStream, RowType rowType, JsonOptions options) { - super(outputStream, rowType); - this.lineDelimiter = options.getLineDelimiter().charAt(0); + PositionOutputStream outputStream, + RowType rowType, + Options options, + CompressionType compressionType) + throws IOException { + super(outputStream, rowType, options, compressionType); + this.lineDelimiter = (new JsonOptions(options)).getLineDelimiter().charAt(0); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java index b82d77d948..709f2d345a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.types.RowType; @@ -31,9 +32,9 @@ import java.io.IOException; public class JsonReaderFactory implements FormatReaderFactory { private final RowType projectedRowType; - private final JsonOptions options; + private final Options options; - public JsonReaderFactory(RowType projectedRowType, JsonOptions options) { + public JsonReaderFactory(RowType projectedRowType, Options options) { this.projectedRowType = projectedRowType; this.options = options; } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java b/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java new file mode 100644 index 0000000000..7f616597d8 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Base class for compression tests across different file formats. */ +public abstract class BaseCompressionTest { + + @TempDir protected java.nio.file.Path tempDir; + + protected final RowType rowType = + RowType.of( + DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.BOOLEAN()); + + protected final List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("Alice"), 100.5, true), + GenericRow.of(2, BinaryString.fromString("Bob"), 200.75, false), + GenericRow.of(3, BinaryString.fromString("Charlie"), 300.25, true), + GenericRow.of(4, BinaryString.fromString("Diana"), 400.0, false)); + + private List<CompressionType> compressionTypes = + Arrays.asList( + CompressionType.NONE, + CompressionType.GZIP, + CompressionType.BZIP2, + CompressionType.DEFLATE, + CompressionType.ZSTD); + + /** Returns the file format for testing. */ + protected abstract FileFormat createFileFormat(Options options); + + /** Returns the file extension for the format. */ + protected abstract String getFormatExtension(); + + @Test + void testCompression() { + compressionTypes.forEach( + compression -> { + try { + testCompressionRoundTrip( + compression.value(), + String.format( + "test_compress.%s.%s", + getFormatExtension(), compression.fileExtension())); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + void testCompressionInitFail() throws IOException { + CompressionType compression = CompressionType.SNAPPY; + assertThrows( + IOException.class, + () -> + testCompressionRoundTrip( + compression.value(), + String.format( + "test_compress.%s.%s", + getFormatExtension(), compression.fileExtension()))); + } + + @Test + void testCompressionDetectionFromFileName() { + compressionTypes.forEach( + compression -> { + try { + testAutoCompressionDetection( + "test_auto." + + getFormatExtension() + + "." + + compression.fileExtension(), + compression.value()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + void testUnsupportedCompressionFormat() throws IOException { + Options options = new Options(); + options.set(CoreOptions.FILE_COMPRESSION, "unsupported"); + testCompressionRoundTripWithOptions(options, "test_file_unsupported_compress"); + } + + protected void testCompressionRoundTrip(String compression, String fileName) + throws IOException { + Options options = new Options(); + options.set(CoreOptions.FILE_COMPRESSION, compression); + testCompressionRoundTripWithOptions(options, fileName); + } + + protected void testCompressionRoundTripWithOptions(Options options, String fileName) + throws IOException { + FileFormat format = createFileFormat(options); + + // Validate the format and compression + format.validateDataFields(rowType); + + Path filePath = new Path(tempDir.resolve(fileName).toString()); + FileIO fileIO = new LocalFileIO(); + + // Write data with compression + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + try (FormatWriter writer = + writerFactory.create( + fileIO.newOutputStream(filePath, false), + options.get(CoreOptions.FILE_COMPRESSION))) { + for (InternalRow row : testData) { + writer.addElement(row); + } + } + + // Read data back + FormatReaderFactory readerFactory = format.createReaderFactory(rowType, null); + List<InternalRow> readData = new ArrayList<>(); + + try (RecordReader<InternalRow> reader = + readerFactory.createReader( + new FormatReaderContext(fileIO, filePath, fileIO.getFileSize(filePath)))) { + reader.forEachRemaining(readData::add); + } + + // Verify data integrity + assertThat(readData).hasSize(testData.size()); + for (int i = 0; i < testData.size(); i++) { + InternalRow expected = testData.get(i); + InternalRow actual = readData.get(i); + + assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0)); + assertThat(actual.getString(1).toString()).isEqualTo(expected.getString(1).toString()); + assertThat(actual.getDouble(2)).isEqualTo(expected.getDouble(2)); + assertThat(actual.getBoolean(3)).isEqualTo(expected.getBoolean(3)); + } + } + + protected void testAutoCompressionDetection(String fileName, String compression) + throws IOException { + // Write file with compression + Options writeOptions = new Options(); + writeOptions.set(CoreOptions.FILE_COMPRESSION, compression); + + FileFormat format = createFileFormat(writeOptions); + Path filePath = new Path(tempDir.resolve(fileName).toString()); + FileIO fileIO = new LocalFileIO(); + + // Write compressed data + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + try (FormatWriter writer = + writerFactory.create(fileIO.newOutputStream(filePath, false), compression)) { + writer.addElement(testData.get(0)); // Write just one row for this test + } + + // Read back with auto-detection (no compression specified in read options) + Options readOptions = new Options(); + readOptions.set(CoreOptions.FILE_COMPRESSION, "none"); // Default to none + + FileFormat readFormat = createFileFormat(readOptions); + FormatReaderFactory readerFactory = readFormat.createReaderFactory(rowType, null); + + List<InternalRow> readData = new ArrayList<>(); + try (RecordReader<InternalRow> reader = + readerFactory.createReader( + new FormatReaderContext(fileIO, filePath, fileIO.getFileSize(filePath)))) { + reader.forEachRemaining(readData::add); + } + + // Should successfully read the data regardless of compression + assertThat(readData).hasSize(1); + InternalRow expected = testData.get(0); + InternalRow actual = readData.get(0); + assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0)); + assertThat(actual.getString(1).toString()).isEqualTo(expected.getString(1).toString()); + } + + protected FormatContext createFormatContext(Options options) { + return new FormatContext(options, 1024, 1024); + } +} diff --git a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java new file mode 100644 index 0000000000..159aa3715e --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.csv; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.format.BaseCompressionTest; +import org.apache.paimon.format.CompressionType; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.options.Options; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +/** Test for CSV compression functionality. */ +class CsvCompressionTest extends BaseCompressionTest { + + @Override + protected FileFormat createFileFormat(Options options) { + return new CsvFileFormat(createFormatContext(options)); + } + + @Override + protected String getFormatExtension() { + return "csv"; + } + + @Test + void testCompressionWithCustomOptions() throws IOException { + Options options = new Options(); + options.set(CoreOptions.FILE_COMPRESSION, CompressionType.GZIP.value()); + options.set(CsvOptions.FIELD_DELIMITER, ";"); + options.set(CsvOptions.INCLUDE_HEADER, true); + + String fileName = "test_custom_options.csv.gz"; + testCompressionRoundTripWithOptions(options, fileName); + } +} diff --git a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java index bed689bd4d..2b12874588 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java @@ -18,12 +18,14 @@ package org.apache.paimon.format.csv; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.format.CompressionType; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.format.FormatReadWriteTest; @@ -61,7 +63,9 @@ public class CsvFileFormatTest extends FormatReadWriteTest { @Override protected FileFormat fileFormat() { - return new CsvFileFormatFactory().create(new FormatContext(new Options(), 1024, 1024)); + Options options = new Options(); + options.set(CoreOptions.FILE_COMPRESSION, compression()); + return new CsvFileFormatFactory().create(new FormatContext(options, 1024, 1024)); } @Test @@ -480,6 +484,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest { return false; } + @Override + public String compression() { + return CompressionType.NONE.value(); + } + @Override public boolean supportDataFileWithoutExtension() { return true; diff --git a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java new file mode 100644 index 0000000000..afd79fa878 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.json; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.format.BaseCompressionTest; +import org.apache.paimon.format.CompressionType; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.options.Options; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +/** Test for JSON compression functionality. */ +class JsonCompressionTest extends BaseCompressionTest { + + @Override + protected FileFormat createFileFormat(Options options) { + return new JsonFileFormat(createFormatContext(options)); + } + + @Override + protected String getFormatExtension() { + return "json"; + } + + @Test + void testCompressionWithCustomJsonOptions() throws IOException { + Options options = new Options(); + options.set(CoreOptions.FILE_COMPRESSION, "gzip"); + options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, true); + options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE, JsonOptions.MapNullKeyMode.DROP); + options.set(JsonOptions.LINE_DELIMITER, "\n"); + + String fileName = "test_custom_json_options.json.gz"; + testCompressionRoundTripWithOptions(options, fileName); + } + + @Test + void testJsonCompressionWithComplexData() throws IOException { + // Test with complex JSON structures and different compression formats + testCompressionRoundTrip(CompressionType.GZIP.value(), "test_complex_gzip.json.gz"); + testCompressionRoundTrip( + CompressionType.DEFLATE.value(), "test_complex_deflate.json.deflate"); + testCompressionRoundTrip(CompressionType.NONE.value(), "test_complex_none.json"); + } +} diff --git a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java index dd38623279..82f6d10a22 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java @@ -18,11 +18,13 @@ package org.apache.paimon.format.json; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericMap; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.format.CompressionType; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory; import org.apache.paimon.format.FormatReadWriteTest; @@ -56,7 +58,14 @@ public class JsonFileFormatTest extends FormatReadWriteTest { @Override protected FileFormat fileFormat() { - return new JsonFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024, 1024)); + Options options = new Options(); + options.set(CoreOptions.FILE_COMPRESSION, compression()); + return new JsonFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + } + + @Override + public String compression() { + return CompressionType.NONE.value(); } @Test diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java index 51c2e63b5f..d889c1ac66 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java @@ -638,8 +638,8 @@ public class SparkWriteITCase { Assertions.assertEquals(4, files.size()); String defaultExtension = "." + "parquet"; - String newExtension = "." + "zstd" + "." + "parquet"; - // two data files end with ".parquet", two data file end with ".zstd.parquet" + String newExtension = "." + "zst" + "." + "parquet"; + // two data files end with ".parquet", two data file end with ".zst.parquet" Assertions.assertEquals( 2, files.stream() @@ -682,8 +682,8 @@ public class SparkWriteITCase { .filter(name -> name.contains("changelog-")) .collect(Collectors.toList()); String defaultExtension = "." + "parquet"; - String newExtension = "." + "zstd" + "." + "parquet"; - // one changelog file end with ".parquet", one changelog file end with ".zstd.parquet" + String newExtension = "." + "zst" + "." + "parquet"; + // one changelog file end with ".parquet", one changelog file end with ".zst.parquet" Assertions.assertEquals( 1, files.stream()