This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c7cd8e9830 [avro] Fix compression not work in writer (#4628)
c7cd8e9830 is described below
commit c7cd8e98305bdc1331551824e57e227c65b6f4cf
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 4 14:07:52 2024 +0800
[avro] Fix compression not work in writer (#4628)
This closes #4628.
---
docs/content/migration/iceberg-compatibility.md | 2 +-
.../org/apache/paimon/iceberg/IcebergOptions.java | 2 +-
.../iceberg/manifest/IcebergManifestFile.java | 8 ++------
.../paimon/iceberg/IcebergCompatibilityTest.java | 24 ++++++++++++++++------
.../apache/paimon/format/avro/AvroFileFormat.java | 2 +-
.../paimon/format/avro/AvroFileFormatTest.java | 12 +++++++++++
6 files changed, 35 insertions(+), 15 deletions(-)
diff --git a/docs/content/migration/iceberg-compatibility.md
b/docs/content/migration/iceberg-compatibility.md
index 01a03a4526..8e4d3c9017 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -373,7 +373,7 @@ you also need to set some (or all) of the following table
options when creating
</tr>
<tr>
<td><h5>metadata.iceberg.manifest-compression</h5></td>
- <td style="word-wrap: break-word;">gzip</td>
+ <td style="word-wrap: break-word;">snappy</td>
<td>String</td>
<td>Compression for Iceberg manifest files.</td>
</tr>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index 4b59e29c8c..55fbab5158 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -74,7 +74,7 @@ public class IcebergOptions {
key("metadata.iceberg.manifest-compression")
.stringType()
.defaultValue(
- "gzip") // some Iceberg reader cannot support
zstd, for example DuckDB
+ "snappy") // some Iceberg reader cannot support
zstd, for example DuckDB
.withDescription("Compression for Iceberg manifest
files.");
public static final ConfigOption<Boolean> MANIFEST_LEGACY_VERSION =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 57484a1f3f..5955da6220 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -18,7 +18,6 @@
package org.apache.paimon.iceberg.manifest;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
@@ -111,7 +110,7 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
}
public List<IcebergManifestFileMeta> rollingWrite(
- Iterator<IcebergManifestEntry> entries, long sequenceNumber)
throws IOException {
+ Iterator<IcebergManifestEntry> entries, long sequenceNumber) {
RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta>
writer =
new RollingFileWriter<>(
() -> createWriter(sequenceNumber),
targetFileSize.getBytes());
@@ -127,10 +126,7 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
public SingleFileWriter<IcebergManifestEntry, IcebergManifestFileMeta>
createWriter(
long sequenceNumber) {
return new IcebergManifestEntryWriter(
- writerFactory,
- pathFactory.newPath(),
- CoreOptions.FILE_COMPRESSION.defaultValue(),
- sequenceNumber);
+ writerFactory, pathFactory.newPath(), compression,
sequenceNumber);
}
private class IcebergManifestEntryWriter
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index b069ac031d..e5b550ff94 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
@@ -281,9 +282,10 @@ public class IcebergCompatibilityTest {
write.write(GenericRow.of(2, 20));
commit.commit(1, write.prepareCommit(false, 1));
assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(1L);
+ FileIO fileIO = table.fileIO();
IcebergMetadata metadata =
IcebergMetadata.fromPath(
- table.fileIO(), new Path(table.location(),
"metadata/v1.metadata.json"));
+ fileIO, new Path(table.location(),
"metadata/v1.metadata.json"));
assertThat(metadata.snapshots()).hasSize(1);
assertThat(metadata.currentSnapshotId()).isEqualTo(1);
@@ -294,7 +296,7 @@ public class IcebergCompatibilityTest {
assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(3L);
metadata =
IcebergMetadata.fromPath(
- table.fileIO(), new Path(table.location(),
"metadata/v3.metadata.json"));
+ fileIO, new Path(table.location(),
"metadata/v3.metadata.json"));
assertThat(metadata.snapshots()).hasSize(3);
assertThat(metadata.currentSnapshotId()).isEqualTo(3);
@@ -304,15 +306,25 @@ public class IcebergCompatibilityTest {
IcebergPathFactory pathFactory =
new IcebergPathFactory(new Path(table.location(), "metadata"));
IcebergManifestList manifestList = IcebergManifestList.create(table,
pathFactory);
- assertThat(manifestList.compression()).isEqualTo("gzip");
+ assertThat(manifestList.compression()).isEqualTo("snappy");
IcebergManifestFile manifestFile = IcebergManifestFile.create(table,
pathFactory);
- assertThat(manifestFile.compression()).isEqualTo("gzip");
+ assertThat(manifestFile.compression()).isEqualTo("snappy");
Set<String> usingManifests = new HashSet<>();
String manifestListFile = new
Path(metadata.currentSnapshot().manifestList()).getName();
+
+ assertThat(fileIO.readFileUtf8(new
Path(pathFactory.metadataDirectory(), manifestListFile)))
+ .contains("snappy");
+
for (IcebergManifestFileMeta fileMeta :
manifestList.read(manifestListFile)) {
usingManifests.add(fileMeta.manifestPath());
+ assertThat(
+ fileIO.readFileUtf8(
+ new Path(
+ pathFactory.metadataDirectory(),
+ fileMeta.manifestPath())))
+ .contains("snappy");
}
IcebergManifestList legacyManifestList =
@@ -345,7 +357,7 @@ public class IcebergCompatibilityTest {
assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L);
metadata =
IcebergMetadata.fromPath(
- table.fileIO(), new Path(table.location(),
"metadata/v5.metadata.json"));
+ fileIO, new Path(table.location(),
"metadata/v5.metadata.json"));
assertThat(metadata.snapshots()).hasSize(3);
assertThat(metadata.currentSnapshotId()).isEqualTo(5);
@@ -358,7 +370,7 @@ public class IcebergCompatibilityTest {
}
for (String path : unusedFiles) {
- assertThat(table.fileIO().exists(new Path(path))).isFalse();
+ assertThat(fileIO.exists(new Path(path))).isFalse();
}
// Test all existing Iceberg snapshots are valid.
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 63a51c0a13..fcce9ae505 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -105,7 +105,7 @@ public class AvroFileFormat extends FileFormat {
if (compression.equalsIgnoreCase("zstd")) {
return CodecFactory.zstandardCodec(zstdLevel);
}
- return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC));
+ return CodecFactory.fromString(compression);
}
/** A {@link FormatWriterFactory} to write {@link InternalRow}. */
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
index 3f6486baae..9c0dbb43fe 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
@@ -198,4 +198,16 @@ public class AvroFileFormatTest {
.isInstanceOf(IOException.class)
.hasMessageContaining("Artificial exception");
}
+
+ @Test
+ void testCompression() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull());
+ AvroFileFormat format = new AvroFileFormat(new FormatContext(new
Options(), 1024, 1024));
+ LocalFileIO localFileIO = LocalFileIO.create();
+ Path file = new Path(new Path(tempPath.toUri()),
UUID.randomUUID().toString());
+ try (PositionOutputStream out = localFileIO.newOutputStream(file,
false)) {
+ assertThatThrownBy(() ->
format.createWriterFactory(rowType).create(out, "unsupported"))
+ .hasMessageContaining("Unrecognized codec: unsupported");
+ }
+ }
}