This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c7cd8e9830 [avro] Fix compression not work in writer (#4628)
c7cd8e9830 is described below

commit c7cd8e98305bdc1331551824e57e227c65b6f4cf
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 4 14:07:52 2024 +0800

    [avro] Fix compression not work in writer (#4628)
    
    This closes #4628.
---
 docs/content/migration/iceberg-compatibility.md    |  2 +-
 .../org/apache/paimon/iceberg/IcebergOptions.java  |  2 +-
 .../iceberg/manifest/IcebergManifestFile.java      |  8 ++------
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 24 ++++++++++++++++------
 .../apache/paimon/format/avro/AvroFileFormat.java  |  2 +-
 .../paimon/format/avro/AvroFileFormatTest.java     | 12 +++++++++++
 6 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/docs/content/migration/iceberg-compatibility.md 
b/docs/content/migration/iceberg-compatibility.md
index 01a03a4526..8e4d3c9017 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -373,7 +373,7 @@ you also need to set some (or all) of the following table 
options when creating
     </tr>
     <tr>
       <td><h5>metadata.iceberg.manifest-compression</h5></td>
-      <td style="word-wrap: break-word;">gzip</td>
+      <td style="word-wrap: break-word;">snappy</td>
       <td>String</td>
       <td>Compression for Iceberg manifest files.</td>
     </tr>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index 4b59e29c8c..55fbab5158 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -74,7 +74,7 @@ public class IcebergOptions {
             key("metadata.iceberg.manifest-compression")
                     .stringType()
                     .defaultValue(
-                            "gzip") // some Iceberg reader cannot support 
zstd, for example DuckDB
+                            "snappy") // some Iceberg reader cannot support 
zstd, for example DuckDB
                     .withDescription("Compression for Iceberg manifest 
files.");
 
     public static final ConfigOption<Boolean> MANIFEST_LEGACY_VERSION =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 57484a1f3f..5955da6220 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.iceberg.manifest;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -111,7 +110,7 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
     }
 
     public List<IcebergManifestFileMeta> rollingWrite(
-            Iterator<IcebergManifestEntry> entries, long sequenceNumber) 
throws IOException {
+            Iterator<IcebergManifestEntry> entries, long sequenceNumber) {
         RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> 
writer =
                 new RollingFileWriter<>(
                         () -> createWriter(sequenceNumber), 
targetFileSize.getBytes());
@@ -127,10 +126,7 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
     public SingleFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> 
createWriter(
             long sequenceNumber) {
         return new IcebergManifestEntryWriter(
-                writerFactory,
-                pathFactory.newPath(),
-                CoreOptions.FILE_COMPRESSION.defaultValue(),
-                sequenceNumber);
+                writerFactory, pathFactory.newPath(), compression, 
sequenceNumber);
     }
 
     private class IcebergManifestEntryWriter
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index b069ac031d..e5b550ff94 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
@@ -281,9 +282,10 @@ public class IcebergCompatibilityTest {
         write.write(GenericRow.of(2, 20));
         commit.commit(1, write.prepareCommit(false, 1));
         assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(1L);
+        FileIO fileIO = table.fileIO();
         IcebergMetadata metadata =
                 IcebergMetadata.fromPath(
-                        table.fileIO(), new Path(table.location(), 
"metadata/v1.metadata.json"));
+                        fileIO, new Path(table.location(), 
"metadata/v1.metadata.json"));
         assertThat(metadata.snapshots()).hasSize(1);
         assertThat(metadata.currentSnapshotId()).isEqualTo(1);
 
@@ -294,7 +296,7 @@ public class IcebergCompatibilityTest {
         assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(3L);
         metadata =
                 IcebergMetadata.fromPath(
-                        table.fileIO(), new Path(table.location(), 
"metadata/v3.metadata.json"));
+                        fileIO, new Path(table.location(), 
"metadata/v3.metadata.json"));
         assertThat(metadata.snapshots()).hasSize(3);
         assertThat(metadata.currentSnapshotId()).isEqualTo(3);
 
@@ -304,15 +306,25 @@ public class IcebergCompatibilityTest {
         IcebergPathFactory pathFactory =
                 new IcebergPathFactory(new Path(table.location(), "metadata"));
         IcebergManifestList manifestList = IcebergManifestList.create(table, 
pathFactory);
-        assertThat(manifestList.compression()).isEqualTo("gzip");
+        assertThat(manifestList.compression()).isEqualTo("snappy");
 
         IcebergManifestFile manifestFile = IcebergManifestFile.create(table, 
pathFactory);
-        assertThat(manifestFile.compression()).isEqualTo("gzip");
+        assertThat(manifestFile.compression()).isEqualTo("snappy");
 
         Set<String> usingManifests = new HashSet<>();
         String manifestListFile = new 
Path(metadata.currentSnapshot().manifestList()).getName();
+
+        assertThat(fileIO.readFileUtf8(new 
Path(pathFactory.metadataDirectory(), manifestListFile)))
+                .contains("snappy");
+
         for (IcebergManifestFileMeta fileMeta : 
manifestList.read(manifestListFile)) {
             usingManifests.add(fileMeta.manifestPath());
+            assertThat(
+                            fileIO.readFileUtf8(
+                                    new Path(
+                                            pathFactory.metadataDirectory(),
+                                            fileMeta.manifestPath())))
+                    .contains("snappy");
         }
 
         IcebergManifestList legacyManifestList =
@@ -345,7 +357,7 @@ public class IcebergCompatibilityTest {
         assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L);
         metadata =
                 IcebergMetadata.fromPath(
-                        table.fileIO(), new Path(table.location(), 
"metadata/v5.metadata.json"));
+                        fileIO, new Path(table.location(), 
"metadata/v5.metadata.json"));
         assertThat(metadata.snapshots()).hasSize(3);
         assertThat(metadata.currentSnapshotId()).isEqualTo(5);
 
@@ -358,7 +370,7 @@ public class IcebergCompatibilityTest {
         }
 
         for (String path : unusedFiles) {
-            assertThat(table.fileIO().exists(new Path(path))).isFalse();
+            assertThat(fileIO.exists(new Path(path))).isFalse();
         }
 
         // Test all existing Iceberg snapshots are valid.
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 63a51c0a13..fcce9ae505 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -105,7 +105,7 @@ public class AvroFileFormat extends FileFormat {
         if (compression.equalsIgnoreCase("zstd")) {
             return CodecFactory.zstandardCodec(zstdLevel);
         }
-        return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC));
+        return CodecFactory.fromString(compression);
     }
 
     /** A {@link FormatWriterFactory} to write {@link InternalRow}. */
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
index 3f6486baae..9c0dbb43fe 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
@@ -198,4 +198,16 @@ public class AvroFileFormatTest {
                 .isInstanceOf(IOException.class)
                 .hasMessageContaining("Artificial exception");
     }
+
+    @Test
+    void testCompression() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT().notNull());
+        AvroFileFormat format = new AvroFileFormat(new FormatContext(new 
Options(), 1024, 1024));
+        LocalFileIO localFileIO = LocalFileIO.create();
+        Path file = new Path(new Path(tempPath.toUri()), 
UUID.randomUUID().toString());
+        try (PositionOutputStream out = localFileIO.newOutputStream(file, 
false)) {
+            assertThatThrownBy(() -> 
format.createWriterFactory(rowType).create(out, "unsupported"))
+                    .hasMessageContaining("Unrecognized codec: unsupported");
+        }
+    }
 }

Reply via email to