This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 08ac7844d2 Core: Propagate Avro compression settings to manifest
writers (#15652)
08ac7844d2 is described below
commit 08ac7844d2e223c3c4209f2f941acf4dd55c1bfb
Author: Russell Spitzer <[email protected]>
AuthorDate: Fri Mar 20 12:04:20 2026 -0500
Core: Propagate Avro compression settings to manifest writers (#15652)
---
.../java/org/apache/iceberg/ManifestBenchmark.java | 279 +++++++++++++++++++++
.../main/java/org/apache/iceberg/InternalData.java | 14 ++
.../java/org/apache/iceberg/ManifestFiles.java | 116 ++++++++-
.../java/org/apache/iceberg/ManifestWriter.java | 78 ++++--
.../java/org/apache/iceberg/SnapshotProducer.java | 33 ++-
.../java/org/apache/iceberg/TableProperties.java | 6 +
.../apache/iceberg/TestManifestWriterVersions.java | 31 +++
.../org/apache/iceberg/TestSnapshotProducer.java | 19 ++
.../org/apache/iceberg/avro/AvroTestHelpers.java | 15 +-
9 files changed, 566 insertions(+), 25 deletions(-)
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
new file mode 100644
index 0000000000..cbd372b7a4
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.openjdk.jmh.annotations.AuxCounters;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * A benchmark that measures manifest read/write performance across
compression codecs.
+ *
+ * <p>Entry counts are calibrated per column count via {@link #ENTRY_BASE}.
Set to 300_000 for ~8 MB
+ * manifests (matching the default {@code commit.manifest.target-size-bytes})
or 15_000 for ~400 KB.
+ *
+ * <p>To run this benchmark:
+ *
+ * <pre>{@code
+ * # all combinations
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark
+ *
+ * # single codec
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * -PjmhParams="codec=gzip"
+ * }</pre>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 6)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class ManifestBenchmark {
+
+ static final int ENTRY_BASE = 300_000;
+
+ private static final int FORMAT_VERSION = 4;
+
+ private static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.required(3, "customer", Types.StringType.get()));
+
+ private static final PartitionSpec SPEC =
+
PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
+
+ @Param({"gzip", "snappy", "zstd", "uncompressed"})
+ private String codec;
+
+ @Param({"true", "false"})
+ private String partitioned;
+
+ @Param({"10", "50", "100"})
+ private int numCols;
+
+ private PartitionSpec spec;
+ private Map<Integer, PartitionSpec> specsById;
+ private Map<String, String> writerProperties;
+ private List<DataFile> dataFiles;
+ private int numEntries;
+
+ private String writeBaseDir;
+ private OutputFile writeOutputFile;
+
+ private String readBaseDir;
+ private ManifestFile readManifest;
+
+ @Setup(Level.Trial)
+ public void setupTrial() {
+ this.spec = Boolean.parseBoolean(partitioned) ? SPEC :
PartitionSpec.unpartitioned();
+ this.specsById = Map.of(spec.specId(), spec);
+ this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
+ // ENTRY_BASE / cols: empirically calibrated — 300_000 → ~8 MB, 15_000 →
~400 KB manifests
+ this.numEntries = ENTRY_BASE / numCols;
+ this.dataFiles = generateDataFiles();
+ setupReadManifest();
+ }
+
+ @Setup(Level.Invocation)
+ public void setupWriteInvocation() throws IOException {
+ this.writeBaseDir =
Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+ this.writeOutputFile =
+ org.apache.iceberg.Files.localOutput(
+ String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+
+ for (DataFile file : dataFiles) {
+ file.path();
+ file.fileSizeInBytes();
+ file.recordCount();
+ }
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDownTrial() {
+ cleanDir(readBaseDir);
+ readBaseDir = null;
+ readManifest = null;
+ dataFiles = null;
+ }
+
+ @TearDown(Level.Invocation)
+ public void tearDownInvocation() {
+ cleanDir(writeBaseDir);
+ writeBaseDir = null;
+ writeOutputFile = null;
+ }
+
+ @AuxCounters(AuxCounters.Type.EVENTS)
+ @State(Scope.Thread)
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ public static class FileSizeCounters {
+ public double manifestSizeMB;
+
+ @Setup(Level.Invocation)
+ public void reset() {
+ manifestSizeMB = 0;
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public ManifestFile writeManifest(FileSizeCounters counters) throws
IOException {
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(FORMAT_VERSION, spec, writeOutputFile, 1L,
writerProperties);
+
+ try (ManifestWriter<DataFile> w = writer) {
+ for (DataFile file : dataFiles) {
+ w.add(file);
+ }
+ }
+
+ ManifestFile manifest = writer.toManifestFile();
+ counters.manifestSizeMB = manifest.length() / (1024.0 * 1024.0);
+ return manifest;
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readManifest(Blackhole blackhole) throws IOException {
+ TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
+ try (CloseableIterator<DataFile> it =
+ ManifestFiles.read(readManifest, fileIO, specsById).iterator()) {
+ while (it.hasNext()) {
+ blackhole.consume(it.next());
+ }
+ }
+ }
+
+ private void setupReadManifest() {
+ try {
+ this.readBaseDir =
Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ OutputFile manifestFile =
+ org.apache.iceberg.Files.localOutput(
+ String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L,
writerProperties);
+
+ try (ManifestWriter<DataFile> w = writer) {
+ for (DataFile file : dataFiles) {
+ w.add(file);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ this.readManifest = writer.toManifestFile();
+ }
+
+ private List<DataFile> generateDataFiles() {
+ Random random = new Random(42);
+ List<DataFile> files = Lists.newArrayListWithCapacity(numEntries);
+ for (int i = 0; i < numEntries; i++) {
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withFormat(FileFormat.PARQUET)
+ .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet",
i))
+ .withFileSizeInBytes(1024 + i)
+ .withRecordCount(1000 + i)
+ .withMetrics(randomMetrics(random, numCols));
+
+ if (!spec.isUnpartitioned()) {
+ builder.withPartitionPath(
+ String.format(
+ Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i
% 50, i % 200));
+ }
+
+ files.add(builder.build());
+ }
+
+ return files;
+ }
+
+ static Metrics randomMetrics(Random random, int cols) {
+ long rowCount = 100_000L + random.nextInt(1000);
+ Map<Integer, Long> columnSizes = Maps.newHashMap();
+ Map<Integer, Long> valueCounts = Maps.newHashMap();
+ Map<Integer, Long> nullValueCounts = Maps.newHashMap();
+ Map<Integer, Long> nanValueCounts = Maps.newHashMap();
+ Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+ Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+ for (int i = 0; i < cols; i++) {
+ columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
+ valueCounts.put(i, 100_000L + random.nextInt(100));
+ nullValueCounts.put(i, (long) random.nextInt(5));
+ nanValueCounts.put(i, (long) random.nextInt(5));
+ byte[] lower = new byte[8];
+ random.nextBytes(lower);
+ lowerBounds.put(i, ByteBuffer.wrap(lower));
+ byte[] upper = new byte[8];
+ random.nextBytes(upper);
+ upperBounds.put(i, ByteBuffer.wrap(upper));
+ }
+
+ return new Metrics(
+ rowCount,
+ columnSizes,
+ valueCounts,
+ nullValueCounts,
+ nanValueCounts,
+ lowerBounds,
+ upperBounds);
+ }
+
+ private static void cleanDir(String dir) {
+ if (dir != null) {
+ FileUtils.deleteQuietly(new File(dir));
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java
b/core/src/main/java/org/apache/iceberg/InternalData.java
index fa39d23e43..040905f032 100644
--- a/core/src/main/java/org/apache/iceberg/InternalData.java
+++ b/core/src/main/java/org/apache/iceberg/InternalData.java
@@ -114,6 +114,20 @@ public class InternalData {
*/
WriteBuilder set(String property, String value);
+ /**
+ * Set writer configuration properties from a Map.
+ *
+ * <p>Write configuration affects writer behavior. To add file metadata
properties, use {@link
+ * #meta(Map)}.
+ *
+ * @param properties a map of writer config properties
+ * @return this for method chaining
+ */
+ default WriteBuilder set(Map<String, String> properties) {
+ properties.forEach(this::set);
+ return this;
+ }
+
/**
* Set a file metadata property.
*
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 63cee1e856..ffeff9c991 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import org.apache.iceberg.ManifestReader.FileType;
import org.apache.iceberg.avro.AvroEncoderUtil;
@@ -211,6 +212,25 @@ public class ManifestFiles {
return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId,
null);
}
+ /**
+ * Create a new {@link ManifestWriter} for the given format version with
custom writer properties.
+ *
+ * @param formatVersion a target format version
+ * @param spec a {@link PartitionSpec}
+ * @param encryptedOutputFile an {@link EncryptedOutputFile} where the
manifest will be written
+ * @param snapshotId a snapshot ID for the manifest entries, or null for an
inherited ID
+ * @param writerProperties properties passed through to the underlying file
writer
+ * @return a manifest writer
+ */
+ public static ManifestWriter<DataFile> write(
+ int formatVersion,
+ PartitionSpec spec,
+ EncryptedOutputFile encryptedOutputFile,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
+ return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId,
null, writerProperties);
+ }
+
/**
* Create a new {@link ManifestWriter} for the given format version.
*
@@ -227,15 +247,54 @@ public class ManifestFiles {
EncryptedOutputFile encryptedOutputFile,
Long snapshotId,
Long firstRowId) {
+ return newWriter(
+ formatVersion, spec, encryptedOutputFile, snapshotId, firstRowId,
Collections.emptyMap());
+ }
+
+ /**
+ * Create a new {@link ManifestWriter} for the given format version with
custom writer properties.
+ *
+ * @param formatVersion a target format version
+ * @param spec a {@link PartitionSpec}
+ * @param outputFile an {@link OutputFile} where the manifest will be written
+ * @param snapshotId a snapshot ID for the manifest entries, or null for an
inherited ID
+ * @param writerProperties properties passed through to the underlying file
writer
+ * @return a manifest writer
+ */
+ public static ManifestWriter<DataFile> write(
+ int formatVersion,
+ PartitionSpec spec,
+ OutputFile outputFile,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
+ return newWriter(
+ formatVersion,
+ spec,
+ EncryptedFiles.plainAsEncryptedOutput(outputFile),
+ snapshotId,
+ null,
+ writerProperties);
+ }
+
+ @VisibleForTesting
+ static ManifestWriter<DataFile> newWriter(
+ int formatVersion,
+ PartitionSpec spec,
+ EncryptedOutputFile encryptedOutputFile,
+ Long snapshotId,
+ Long firstRowId,
+ Map<String, String> writerProperties) {
switch (formatVersion) {
case 1:
- return new ManifestWriter.V1Writer(spec, encryptedOutputFile,
snapshotId);
+ return new ManifestWriter.V1Writer(spec, encryptedOutputFile,
snapshotId, writerProperties);
case 2:
- return new ManifestWriter.V2Writer(spec, encryptedOutputFile,
snapshotId);
+ return new ManifestWriter.V2Writer(spec, encryptedOutputFile,
snapshotId, writerProperties);
case 3:
- return new ManifestWriter.V3Writer(spec, encryptedOutputFile,
snapshotId, firstRowId);
+ return new ManifestWriter.V3Writer(
+ spec, encryptedOutputFile, snapshotId, firstRowId,
writerProperties);
case 4:
- return new ManifestWriter.V4Writer(spec, encryptedOutputFile,
snapshotId, firstRowId);
+ return new ManifestWriter.V4Writer(
+ spec, encryptedOutputFile, snapshotId, firstRowId,
writerProperties);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
@@ -276,6 +335,30 @@ public class ManifestFiles {
formatVersion, spec,
EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
}
+ /**
+ * Create a new {@link ManifestWriter} for the given format version with
custom writer properties.
+ *
+ * @param formatVersion a target format version
+ * @param spec a {@link PartitionSpec}
+ * @param outputFile an {@link OutputFile} where the manifest will be written
+ * @param snapshotId a snapshot ID for the manifest entries, or null for an
inherited ID
+ * @param writerProperties properties passed through to the underlying file
writer
+ * @return a manifest writer
+ */
+ public static ManifestWriter<DeleteFile> writeDeleteManifest(
+ int formatVersion,
+ PartitionSpec spec,
+ OutputFile outputFile,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
+ return writeDeleteManifest(
+ formatVersion,
+ spec,
+ EncryptedFiles.plainAsEncryptedOutput(outputFile),
+ snapshotId,
+ writerProperties);
+ }
+
/**
* Create a new {@link ManifestWriter} for the given format version.
*
@@ -287,15 +370,34 @@ public class ManifestFiles {
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, EncryptedOutputFile outputFile,
Long snapshotId) {
+ return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId,
Collections.emptyMap());
+ }
+
+ /**
+ * Create a new {@link ManifestWriter} for the given format version with
custom writer properties.
+ *
+ * @param formatVersion a target format version
+ * @param spec a {@link PartitionSpec}
+ * @param outputFile an {@link EncryptedOutputFile} where the manifest will
be written
+ * @param snapshotId a snapshot ID for the manifest entries, or null for an
inherited ID
+ * @param writerProperties properties passed through to the underlying file
writer
+ * @return a manifest writer
+ */
+ public static ManifestWriter<DeleteFile> writeDeleteManifest(
+ int formatVersion,
+ PartitionSpec spec,
+ EncryptedOutputFile outputFile,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1
table");
case 2:
- return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
+ return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId,
writerProperties);
case 3:
- return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId);
+ return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId,
writerProperties);
case 4:
- return new ManifestWriter.V4DeleteWriter(spec, outputFile, snapshotId);
+ return new ManifestWriter.V4DeleteWriter(spec, outputFile, snapshotId,
writerProperties);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 43b8e3ed70..7d85f991b0 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -20,6 +20,7 @@ package org.apache.iceberg;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Map;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata;
@@ -47,6 +48,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
private final GenericManifestEntry<F> reused;
private final PartitionSummary stats;
private final Long firstRowId;
+ private final Map<String, String> writerProperties;
private boolean closed = false;
private int addedFiles = 0;
@@ -58,9 +60,14 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
private Long minDataSequenceNumber = null;
private ManifestWriter(
- PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, Long
firstRowId) {
+ PartitionSpec spec,
+ EncryptedOutputFile file,
+ Long snapshotId,
+ Long firstRowId,
+ Map<String, String> writerProperties) {
this.file = file.encryptingOutputFile();
this.specId = spec.specId();
+ this.writerProperties = writerProperties;
this.writer = newAppender(spec, this.file);
this.snapshotId = snapshotId;
this.reused =
@@ -75,6 +82,10 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
protected abstract FileAppender<ManifestEntry<F>> newAppender(
PartitionSpec spec, OutputFile outputFile);
+ protected Map<String, String> writerProperties() {
+ return writerProperties;
+ }
+
protected ManifestContent content() {
return ManifestContent.DATA;
}
@@ -238,8 +249,13 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V4Writer extends ManifestWriter<DataFile> {
private final V4Metadata.ManifestEntryWrapper<DataFile> entryWrapper;
- V4Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId,
Long firstRowId) {
- super(spec, file, snapshotId, firstRowId);
+ V4Writer(
+ PartitionSpec spec,
+ EncryptedOutputFile file,
+ Long snapshotId,
+ Long firstRowId,
+ Map<String, String> writerProperties) {
+ super(spec, file, snapshotId, firstRowId, writerProperties);
this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -261,6 +277,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "4")
.meta("content", "data")
+ .set(writerProperties())
.overwrite()
.build();
} catch (IOException e) {
@@ -273,8 +290,12 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V4DeleteWriter extends ManifestWriter<DeleteFile> {
private final V4Metadata.ManifestEntryWrapper<DeleteFile> entryWrapper;
- V4DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long
snapshotId) {
- super(spec, file, snapshotId, null);
+ V4DeleteWriter(
+ PartitionSpec spec,
+ EncryptedOutputFile file,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
+ super(spec, file, snapshotId, null, writerProperties);
this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -296,6 +317,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "4")
.meta("content", "deletes")
+ .set(writerProperties())
.overwrite()
.build();
} catch (IOException e) {
@@ -313,8 +335,13 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V3Writer extends ManifestWriter<DataFile> {
private final V3Metadata.ManifestEntryWrapper<DataFile> entryWrapper;
- V3Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId,
Long firstRowId) {
- super(spec, file, snapshotId, firstRowId);
+ V3Writer(
+ PartitionSpec spec,
+ EncryptedOutputFile file,
+ Long snapshotId,
+ Long firstRowId,
+ Map<String, String> writerProperties) {
+ super(spec, file, snapshotId, firstRowId, writerProperties);
this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -336,6 +363,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "3")
.meta("content", "data")
+ .set(writerProperties())
.overwrite()
.build();
} catch (IOException e) {
@@ -348,8 +376,12 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V3DeleteWriter extends ManifestWriter<DeleteFile> {
private final V3Metadata.ManifestEntryWrapper<DeleteFile> entryWrapper;
- V3DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long
snapshotId) {
- super(spec, file, snapshotId, null);
+ V3DeleteWriter(
+ PartitionSpec spec,
+ EncryptedOutputFile file,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
+ super(spec, file, snapshotId, null, writerProperties);
this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -371,6 +403,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "3")
.meta("content", "deletes")
+ .set(writerProperties())
.overwrite()
.build();
} catch (IOException e) {
@@ -388,8 +421,12 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V2Writer extends ManifestWriter<DataFile> {
private final V2Metadata.ManifestEntryWrapper<DataFile> entryWrapper;
- V2Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
- super(spec, file, snapshotId, null);
+ V2Writer(
+ PartitionSpec spec,
+ EncryptedOutputFile file,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
+ super(spec, file, snapshotId, null, writerProperties);
this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -411,6 +448,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "2")
.meta("content", "data")
+ .set(writerProperties())
.overwrite()
.build();
} catch (IOException e) {
@@ -423,8 +461,12 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
private final V2Metadata.ManifestEntryWrapper<DeleteFile> entryWrapper;
- V2DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long
snapshotId) {
- super(spec, file, snapshotId, null);
+ V2DeleteWriter(
+ PartitionSpec spec,
+ EncryptedOutputFile file,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
+ super(spec, file, snapshotId, null, writerProperties);
this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -446,6 +488,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "2")
.meta("content", "deletes")
+ .set(writerProperties())
.overwrite()
.build();
} catch (IOException e) {
@@ -463,8 +506,12 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
static class V1Writer extends ManifestWriter<DataFile> {
private final V1Metadata.ManifestEntryWrapper entryWrapper;
- V1Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
- super(spec, file, snapshotId, null);
+ V1Writer(
+ PartitionSpec spec,
+ EncryptedOutputFile file,
+ Long snapshotId,
+ Map<String, String> writerProperties) {
+ super(spec, file, snapshotId, null, writerProperties);
this.entryWrapper = new V1Metadata.ManifestEntryWrapper();
}
@@ -485,6 +532,7 @@ public abstract class ManifestWriter<F extends
ContentFile<F>> implements FileAp
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "1")
+ .set(writerProperties())
.overwrite()
.build();
} catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 1fd7dab62f..6ba10e8049 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -113,6 +113,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
private final AtomicInteger attempt = new AtomicInteger(0);
private final List<String> manifestLists = Lists.newArrayList();
private final long targetManifestSizeBytes;
+ private final Map<String, String> manifestWriterProps;
private MetricsReporter reporter = LoggingMetricsReporter.instance();
private volatile Long snapshotId = null;
private TableMetadata base;
@@ -141,6 +142,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
this.targetManifestSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES,
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
+ this.manifestWriterProps = manifestWriterProperties(ops.current());
boolean snapshotIdInheritanceEnabled =
ops.current()
.propertyAsBoolean(
@@ -608,12 +610,20 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) {
return ManifestFiles.write(
- ops.current().formatVersion(), spec, newManifestOutputFile(),
snapshotId());
+ ops.current().formatVersion(),
+ spec,
+ newManifestOutputFile(),
+ snapshotId(),
+ manifestWriterProps);
}
protected ManifestWriter<DeleteFile> newDeleteManifestWriter(PartitionSpec
spec) {
return ManifestFiles.writeDeleteManifest(
- ops.current().formatVersion(), spec, newManifestOutputFile(),
snapshotId());
+ ops.current().formatVersion(),
+ spec,
+ newManifestOutputFile(),
+ snapshotId(),
+ manifestWriterProps);
}
protected RollingManifestWriter<DataFile>
newRollingManifestWriter(PartitionSpec spec) {
@@ -625,6 +635,25 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
() -> newDeleteManifestWriter(spec), targetManifestSizeBytes);
}
+ private static Map<String, String> manifestWriterProperties(TableMetadata
metadata) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+
+ String codec =
+ metadata.property(
+ TableProperties.MANIFEST_COMPRESSION,
TableProperties.MANIFEST_COMPRESSION_DEFAULT);
+ builder.put(TableProperties.AVRO_COMPRESSION, codec);
+
+ String level =
+ metadata.property(
+ TableProperties.MANIFEST_COMPRESSION_LEVEL,
+ TableProperties.MANIFEST_COMPRESSION_LEVEL_DEFAULT);
+ if (level != null) {
+ builder.put(TableProperties.AVRO_COMPRESSION_LEVEL, level);
+ }
+
+ return builder.build();
+ }
+
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
return ManifestFiles.read(manifest, ops.io(), ops.current().specsById());
}
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 32dc758ab0..1f778984af 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -191,6 +191,12 @@ public class TableProperties {
public static final String DELETE_AVRO_COMPRESSION_LEVEL =
"write.delete.avro.compression-level";
public static final String AVRO_COMPRESSION_LEVEL_DEFAULT = null;
+ public static final String MANIFEST_COMPRESSION =
"write.manifest.compression-codec";
+ public static final String MANIFEST_COMPRESSION_DEFAULT = "gzip";
+
+ public static final String MANIFEST_COMPRESSION_LEVEL =
"write.manifest.compression-level";
+ public static final String MANIFEST_COMPRESSION_LEVEL_DEFAULT = null;
+
public static final String ORC_STRIPE_SIZE_BYTES =
"write.orc.stripe-size-bytes";
public static final String ORC_BLOOM_FILTER_COLUMNS =
"write.orc.bloom.filter.columns";
diff --git
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index fdd9cc33a4..5e83827f0c 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -18,11 +18,13 @@
*/
package org.apache.iceberg;
+import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
@@ -313,6 +315,35 @@ public class TestManifestWriterVersions {
checkRewrittenEntry(readManifest(manifest3), 0L, FileContent.DATA,
FIRST_ROW_ID);
}
+ @ParameterizedTest
+ @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+ public void testDefaultManifestCompression(int formatVersion) throws
IOException {
+ File manifestFile = temp.resolve("default-v" + formatVersion +
".avro").toFile();
+ OutputFile outputFile = Files.localOutput(manifestFile);
+
+ try (ManifestWriter<DataFile> writer =
+ ManifestFiles.write(formatVersion, SPEC, outputFile, SNAPSHOT_ID)) {
+ writer.add(DATA_FILE);
+ }
+
+ assertThat(readAvroCodec(manifestFile)).isEqualTo("deflate");
+ }
+
+ @ParameterizedTest
+ @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+ public void testCustomManifestCompression(int formatVersion) throws
IOException {
+ Map<String, String> props =
ImmutableMap.of(TableProperties.AVRO_COMPRESSION, "snappy");
+ File manifestFile = temp.resolve("snappy-v" + formatVersion +
".avro").toFile();
+ OutputFile outputFile = Files.localOutput(manifestFile);
+
+ try (ManifestWriter<DataFile> writer =
+ ManifestFiles.write(formatVersion, SPEC, outputFile, SNAPSHOT_ID,
props)) {
+ writer.add(DATA_FILE);
+ }
+
+ assertThat(readAvroCodec(manifestFile)).isEqualTo("snappy");
+ }
+
void checkEntry(
ManifestEntry<?> entry,
Long expectedDataSequenceNumber,
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index b6c0ab65e2..dd97738759 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
import static org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP;
+import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -224,4 +225,22 @@ public class TestSnapshotProducer extends TestBase {
}
};
}
+
+ @TestTemplate
+ public void testDefaultManifestCompression() throws IOException {
+ table.newFastAppend().appendFile(FILE_A).commit();
+
+ ManifestFile manifest =
table.currentSnapshot().dataManifests(table.io()).get(0);
+ assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("deflate");
+ }
+
+ @TestTemplate
+ public void testManifestCompressionFromTableProperty() throws IOException {
+ table.updateProperties().set(TableProperties.MANIFEST_COMPRESSION,
"snappy").commit();
+
+ table.newFastAppend().appendFile(FILE_A).commit();
+
+ ManifestFile manifest =
table.currentSnapshot().dataManifests(table.io()).get(0);
+ assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("snappy");
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
index 56efc4cd3e..0a1cf43f4f 100644
--- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
+++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
@@ -21,18 +21,23 @@ package org.apache.iceberg.avro;
import static org.apache.iceberg.avro.AvroSchemaUtil.toOption;
import static org.assertj.core.api.Assertions.assertThat;
+import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableFileInput;
import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantTestUtil;
-class AvroTestHelpers {
+public class AvroTestHelpers {
private AvroTestHelpers() {}
@@ -164,4 +169,12 @@ class AvroTestHelpers {
throw new IllegalArgumentException("Not a supported type: " + type);
}
}
+
+ /** Reads the {@code avro.codec} metadata value from an Avro data file. */
+ public static String readAvroCodec(File file) throws IOException {
+ try (DataFileReader<?> reader =
+ new DataFileReader<>(new SeekableFileInput(file), new
GenericDatumReader<>())) {
+ return reader.getMetaString("avro.codec");
+ }
+ }
}