This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 08ac7844d2 Core: Propagate Avro compression settings to manifest 
writers (#15652)
08ac7844d2 is described below

commit 08ac7844d2e223c3c4209f2f941acf4dd55c1bfb
Author: Russell Spitzer <[email protected]>
AuthorDate: Fri Mar 20 12:04:20 2026 -0500

    Core: Propagate Avro compression settings to manifest writers (#15652)
---
 .../java/org/apache/iceberg/ManifestBenchmark.java | 279 +++++++++++++++++++++
 .../main/java/org/apache/iceberg/InternalData.java |  14 ++
 .../java/org/apache/iceberg/ManifestFiles.java     | 116 ++++++++-
 .../java/org/apache/iceberg/ManifestWriter.java    |  78 ++++--
 .../java/org/apache/iceberg/SnapshotProducer.java  |  33 ++-
 .../java/org/apache/iceberg/TableProperties.java   |   6 +
 .../apache/iceberg/TestManifestWriterVersions.java |  31 +++
 .../org/apache/iceberg/TestSnapshotProducer.java   |  19 ++
 .../org/apache/iceberg/avro/AvroTestHelpers.java   |  15 +-
 9 files changed, 566 insertions(+), 25 deletions(-)

diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
new file mode 100644
index 0000000000..cbd372b7a4
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.openjdk.jmh.annotations.AuxCounters;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * A benchmark that measures manifest read/write performance across 
compression codecs.
+ *
+ * <p>Entry counts are calibrated per column count via {@link #ENTRY_BASE}. 
Set to 300_000 for ~8 MB
+ * manifests (matching the default {@code commit.manifest.target-size-bytes}) 
or 15_000 for ~400 KB.
+ *
+ * <p>To run this benchmark:
+ *
+ * <pre>{@code
+ * # all combinations
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark
+ *
+ * # single codec
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ *     -PjmhParams="codec=gzip"
+ * }</pre>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 6)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class ManifestBenchmark {
+
+  static final int ENTRY_BASE = 300_000;
+
+  private static final int FORMAT_VERSION = 4;
+
+  private static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "data", Types.StringType.get()),
+          Types.NestedField.required(3, "customer", Types.StringType.get()));
+
+  private static final PartitionSpec SPEC =
+      
PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
+
+  @Param({"gzip", "snappy", "zstd", "uncompressed"})
+  private String codec;
+
+  @Param({"true", "false"})
+  private String partitioned;
+
+  @Param({"10", "50", "100"})
+  private int numCols;
+
+  private PartitionSpec spec;
+  private Map<Integer, PartitionSpec> specsById;
+  private Map<String, String> writerProperties;
+  private List<DataFile> dataFiles;
+  private int numEntries;
+
+  private String writeBaseDir;
+  private OutputFile writeOutputFile;
+
+  private String readBaseDir;
+  private ManifestFile readManifest;
+
+  @Setup(Level.Trial)
+  public void setupTrial() {
+    this.spec = Boolean.parseBoolean(partitioned) ? SPEC : 
PartitionSpec.unpartitioned();
+    this.specsById = Map.of(spec.specId(), spec);
+    this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
+    // ENTRY_BASE / cols: empirically calibrated — 300_000 → ~8 MB, 15_000 → 
~400 KB manifests
+    this.numEntries = ENTRY_BASE / numCols;
+    this.dataFiles = generateDataFiles();
+    setupReadManifest();
+  }
+
+  @Setup(Level.Invocation)
+  public void setupWriteInvocation() throws IOException {
+    this.writeBaseDir = 
Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+    this.writeOutputFile =
+        org.apache.iceberg.Files.localOutput(
+            String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+
+    for (DataFile file : dataFiles) {
+      file.path();
+      file.fileSizeInBytes();
+      file.recordCount();
+    }
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDownTrial() {
+    cleanDir(readBaseDir);
+    readBaseDir = null;
+    readManifest = null;
+    dataFiles = null;
+  }
+
+  @TearDown(Level.Invocation)
+  public void tearDownInvocation() {
+    cleanDir(writeBaseDir);
+    writeBaseDir = null;
+    writeOutputFile = null;
+  }
+
+  @AuxCounters(AuxCounters.Type.EVENTS)
+  @State(Scope.Thread)
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public static class FileSizeCounters {
+    public double manifestSizeMB;
+
+    @Setup(Level.Invocation)
+    public void reset() {
+      manifestSizeMB = 0;
+    }
+  }
+
+  @Benchmark
+  @Threads(1)
+  public ManifestFile writeManifest(FileSizeCounters counters) throws 
IOException {
+    ManifestWriter<DataFile> writer =
+        ManifestFiles.write(FORMAT_VERSION, spec, writeOutputFile, 1L, 
writerProperties);
+
+    try (ManifestWriter<DataFile> w = writer) {
+      for (DataFile file : dataFiles) {
+        w.add(file);
+      }
+    }
+
+    ManifestFile manifest = writer.toManifestFile();
+    counters.manifestSizeMB = manifest.length() / (1024.0 * 1024.0);
+    return manifest;
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readManifest(Blackhole blackhole) throws IOException {
+    TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
+    try (CloseableIterator<DataFile> it =
+        ManifestFiles.read(readManifest, fileIO, specsById).iterator()) {
+      while (it.hasNext()) {
+        blackhole.consume(it.next());
+      }
+    }
+  }
+
+  private void setupReadManifest() {
+    try {
+      this.readBaseDir = 
Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    OutputFile manifestFile =
+        org.apache.iceberg.Files.localOutput(
+            String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+
+    ManifestWriter<DataFile> writer =
+        ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L, 
writerProperties);
+
+    try (ManifestWriter<DataFile> w = writer) {
+      for (DataFile file : dataFiles) {
+        w.add(file);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    this.readManifest = writer.toManifestFile();
+  }
+
+  private List<DataFile> generateDataFiles() {
+    Random random = new Random(42);
+    List<DataFile> files = Lists.newArrayListWithCapacity(numEntries);
+    for (int i = 0; i < numEntries; i++) {
+      DataFiles.Builder builder =
+          DataFiles.builder(spec)
+              .withFormat(FileFormat.PARQUET)
+              .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet", 
i))
+              .withFileSizeInBytes(1024 + i)
+              .withRecordCount(1000 + i)
+              .withMetrics(randomMetrics(random, numCols));
+
+      if (!spec.isUnpartitioned()) {
+        builder.withPartitionPath(
+            String.format(
+                Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i 
% 50, i % 200));
+      }
+
+      files.add(builder.build());
+    }
+
+    return files;
+  }
+
+  static Metrics randomMetrics(Random random, int cols) {
+    long rowCount = 100_000L + random.nextInt(1000);
+    Map<Integer, Long> columnSizes = Maps.newHashMap();
+    Map<Integer, Long> valueCounts = Maps.newHashMap();
+    Map<Integer, Long> nullValueCounts = Maps.newHashMap();
+    Map<Integer, Long> nanValueCounts = Maps.newHashMap();
+    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+    for (int i = 0; i < cols; i++) {
+      columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
+      valueCounts.put(i, 100_000L + random.nextInt(100));
+      nullValueCounts.put(i, (long) random.nextInt(5));
+      nanValueCounts.put(i, (long) random.nextInt(5));
+      byte[] lower = new byte[8];
+      random.nextBytes(lower);
+      lowerBounds.put(i, ByteBuffer.wrap(lower));
+      byte[] upper = new byte[8];
+      random.nextBytes(upper);
+      upperBounds.put(i, ByteBuffer.wrap(upper));
+    }
+
+    return new Metrics(
+        rowCount,
+        columnSizes,
+        valueCounts,
+        nullValueCounts,
+        nanValueCounts,
+        lowerBounds,
+        upperBounds);
+  }
+
+  private static void cleanDir(String dir) {
+    if (dir != null) {
+      FileUtils.deleteQuietly(new File(dir));
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java 
b/core/src/main/java/org/apache/iceberg/InternalData.java
index fa39d23e43..040905f032 100644
--- a/core/src/main/java/org/apache/iceberg/InternalData.java
+++ b/core/src/main/java/org/apache/iceberg/InternalData.java
@@ -114,6 +114,20 @@ public class InternalData {
      */
     WriteBuilder set(String property, String value);
 
+    /**
+     * Set writer configuration properties from a Map.
+     *
+     * <p>Write configuration affects writer behavior. To add file metadata 
properties, use {@link
+     * #meta(Map)}.
+     *
+     * @param properties a map of writer config properties
+     * @return this for method chaining
+     */
+    default WriteBuilder set(Map<String, String> properties) {
+      properties.forEach(this::set);
+      return this;
+    }
+
     /**
      * Set a file metadata property.
      *
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java 
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 63cee1e856..ffeff9c991 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import org.apache.iceberg.ManifestReader.FileType;
 import org.apache.iceberg.avro.AvroEncoderUtil;
@@ -211,6 +212,25 @@ public class ManifestFiles {
     return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId, 
null);
   }
 
+  /**
+   * Create a new {@link ManifestWriter} for the given format version with 
custom writer properties.
+   *
+   * @param formatVersion a target format version
+   * @param spec a {@link PartitionSpec}
+   * @param encryptedOutputFile an {@link EncryptedOutputFile} where the 
manifest will be written
+   * @param snapshotId a snapshot ID for the manifest entries, or null for an 
inherited ID
+   * @param writerProperties properties passed through to the underlying file 
writer
+   * @return a manifest writer
+   */
+  public static ManifestWriter<DataFile> write(
+      int formatVersion,
+      PartitionSpec spec,
+      EncryptedOutputFile encryptedOutputFile,
+      Long snapshotId,
+      Map<String, String> writerProperties) {
+    return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId, 
null, writerProperties);
+  }
+
   /**
    * Create a new {@link ManifestWriter} for the given format version.
    *
@@ -227,15 +247,54 @@ public class ManifestFiles {
       EncryptedOutputFile encryptedOutputFile,
       Long snapshotId,
       Long firstRowId) {
+    return newWriter(
+        formatVersion, spec, encryptedOutputFile, snapshotId, firstRowId, 
Collections.emptyMap());
+  }
+
+  /**
+   * Create a new {@link ManifestWriter} for the given format version with 
custom writer properties.
+   *
+   * @param formatVersion a target format version
+   * @param spec a {@link PartitionSpec}
+   * @param outputFile an {@link OutputFile} where the manifest will be written
+   * @param snapshotId a snapshot ID for the manifest entries, or null for an 
inherited ID
+   * @param writerProperties properties passed through to the underlying file 
writer
+   * @return a manifest writer
+   */
+  public static ManifestWriter<DataFile> write(
+      int formatVersion,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      Long snapshotId,
+      Map<String, String> writerProperties) {
+    return newWriter(
+        formatVersion,
+        spec,
+        EncryptedFiles.plainAsEncryptedOutput(outputFile),
+        snapshotId,
+        null,
+        writerProperties);
+  }
+
+  @VisibleForTesting
+  static ManifestWriter<DataFile> newWriter(
+      int formatVersion,
+      PartitionSpec spec,
+      EncryptedOutputFile encryptedOutputFile,
+      Long snapshotId,
+      Long firstRowId,
+      Map<String, String> writerProperties) {
     switch (formatVersion) {
       case 1:
-        return new ManifestWriter.V1Writer(spec, encryptedOutputFile, 
snapshotId);
+        return new ManifestWriter.V1Writer(spec, encryptedOutputFile, 
snapshotId, writerProperties);
       case 2:
-        return new ManifestWriter.V2Writer(spec, encryptedOutputFile, 
snapshotId);
+        return new ManifestWriter.V2Writer(spec, encryptedOutputFile, 
snapshotId, writerProperties);
       case 3:
-        return new ManifestWriter.V3Writer(spec, encryptedOutputFile, 
snapshotId, firstRowId);
+        return new ManifestWriter.V3Writer(
+            spec, encryptedOutputFile, snapshotId, firstRowId, 
writerProperties);
       case 4:
-        return new ManifestWriter.V4Writer(spec, encryptedOutputFile, 
snapshotId, firstRowId);
+        return new ManifestWriter.V4Writer(
+            spec, encryptedOutputFile, snapshotId, firstRowId, 
writerProperties);
     }
     throw new UnsupportedOperationException(
         "Cannot write manifest for table version: " + formatVersion);
@@ -276,6 +335,30 @@ public class ManifestFiles {
         formatVersion, spec, 
EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
   }
 
+  /**
+   * Create a new {@link ManifestWriter} for the given format version with 
custom writer properties.
+   *
+   * @param formatVersion a target format version
+   * @param spec a {@link PartitionSpec}
+   * @param outputFile an {@link OutputFile} where the manifest will be written
+   * @param snapshotId a snapshot ID for the manifest entries, or null for an 
inherited ID
+   * @param writerProperties properties passed through to the underlying file 
writer
+   * @return a manifest writer
+   */
+  public static ManifestWriter<DeleteFile> writeDeleteManifest(
+      int formatVersion,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      Long snapshotId,
+      Map<String, String> writerProperties) {
+    return writeDeleteManifest(
+        formatVersion,
+        spec,
+        EncryptedFiles.plainAsEncryptedOutput(outputFile),
+        snapshotId,
+        writerProperties);
+  }
+
   /**
    * Create a new {@link ManifestWriter} for the given format version.
    *
@@ -287,15 +370,34 @@ public class ManifestFiles {
    */
   public static ManifestWriter<DeleteFile> writeDeleteManifest(
       int formatVersion, PartitionSpec spec, EncryptedOutputFile outputFile, 
Long snapshotId) {
+    return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId, 
Collections.emptyMap());
+  }
+
+  /**
+   * Create a new {@link ManifestWriter} for the given format version with 
custom writer properties.
+   *
+   * @param formatVersion a target format version
+   * @param spec a {@link PartitionSpec}
+   * @param outputFile an {@link EncryptedOutputFile} where the manifest will 
be written
+   * @param snapshotId a snapshot ID for the manifest entries, or null for an 
inherited ID
+   * @param writerProperties properties passed through to the underlying file 
writer
+   * @return a manifest writer
+   */
+  public static ManifestWriter<DeleteFile> writeDeleteManifest(
+      int formatVersion,
+      PartitionSpec spec,
+      EncryptedOutputFile outputFile,
+      Long snapshotId,
+      Map<String, String> writerProperties) {
     switch (formatVersion) {
       case 1:
         throw new IllegalArgumentException("Cannot write delete files in a v1 
table");
       case 2:
-        return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
+        return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId, 
writerProperties);
       case 3:
-        return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId);
+        return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId, 
writerProperties);
       case 4:
-        return new ManifestWriter.V4DeleteWriter(spec, outputFile, snapshotId);
+        return new ManifestWriter.V4DeleteWriter(spec, outputFile, snapshotId, 
writerProperties);
     }
     throw new UnsupportedOperationException(
         "Cannot write manifest for table version: " + formatVersion);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java 
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 43b8e3ed70..7d85f991b0 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -20,6 +20,7 @@ package org.apache.iceberg;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionKeyMetadata;
 import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata;
@@ -47,6 +48,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   private final GenericManifestEntry<F> reused;
   private final PartitionSummary stats;
   private final Long firstRowId;
+  private final Map<String, String> writerProperties;
 
   private boolean closed = false;
   private int addedFiles = 0;
@@ -58,9 +60,14 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   private Long minDataSequenceNumber = null;
 
   private ManifestWriter(
-      PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, Long 
firstRowId) {
+      PartitionSpec spec,
+      EncryptedOutputFile file,
+      Long snapshotId,
+      Long firstRowId,
+      Map<String, String> writerProperties) {
     this.file = file.encryptingOutputFile();
     this.specId = spec.specId();
+    this.writerProperties = writerProperties;
     this.writer = newAppender(spec, this.file);
     this.snapshotId = snapshotId;
     this.reused =
@@ -75,6 +82,10 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   protected abstract FileAppender<ManifestEntry<F>> newAppender(
       PartitionSpec spec, OutputFile outputFile);
 
+  protected Map<String, String> writerProperties() {
+    return writerProperties;
+  }
+
   protected ManifestContent content() {
     return ManifestContent.DATA;
   }
@@ -238,8 +249,13 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V4Writer extends ManifestWriter<DataFile> {
     private final V4Metadata.ManifestEntryWrapper<DataFile> entryWrapper;
 
-    V4Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, 
Long firstRowId) {
-      super(spec, file, snapshotId, firstRowId);
+    V4Writer(
+        PartitionSpec spec,
+        EncryptedOutputFile file,
+        Long snapshotId,
+        Long firstRowId,
+        Map<String, String> writerProperties) {
+      super(spec, file, snapshotId, firstRowId, writerProperties);
       this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -261,6 +277,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
             .meta("partition-spec-id", String.valueOf(spec.specId()))
             .meta("format-version", "4")
             .meta("content", "data")
+            .set(writerProperties())
             .overwrite()
             .build();
       } catch (IOException e) {
@@ -273,8 +290,12 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V4DeleteWriter extends ManifestWriter<DeleteFile> {
     private final V4Metadata.ManifestEntryWrapper<DeleteFile> entryWrapper;
 
-    V4DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long 
snapshotId) {
-      super(spec, file, snapshotId, null);
+    V4DeleteWriter(
+        PartitionSpec spec,
+        EncryptedOutputFile file,
+        Long snapshotId,
+        Map<String, String> writerProperties) {
+      super(spec, file, snapshotId, null, writerProperties);
       this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -296,6 +317,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
             .meta("partition-spec-id", String.valueOf(spec.specId()))
             .meta("format-version", "4")
             .meta("content", "deletes")
+            .set(writerProperties())
             .overwrite()
             .build();
       } catch (IOException e) {
@@ -313,8 +335,13 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V3Writer extends ManifestWriter<DataFile> {
     private final V3Metadata.ManifestEntryWrapper<DataFile> entryWrapper;
 
-    V3Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, 
Long firstRowId) {
-      super(spec, file, snapshotId, firstRowId);
+    V3Writer(
+        PartitionSpec spec,
+        EncryptedOutputFile file,
+        Long snapshotId,
+        Long firstRowId,
+        Map<String, String> writerProperties) {
+      super(spec, file, snapshotId, firstRowId, writerProperties);
       this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -336,6 +363,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
             .meta("partition-spec-id", String.valueOf(spec.specId()))
             .meta("format-version", "3")
             .meta("content", "data")
+            .set(writerProperties())
             .overwrite()
             .build();
       } catch (IOException e) {
@@ -348,8 +376,12 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V3DeleteWriter extends ManifestWriter<DeleteFile> {
     private final V3Metadata.ManifestEntryWrapper<DeleteFile> entryWrapper;
 
-    V3DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long 
snapshotId) {
-      super(spec, file, snapshotId, null);
+    V3DeleteWriter(
+        PartitionSpec spec,
+        EncryptedOutputFile file,
+        Long snapshotId,
+        Map<String, String> writerProperties) {
+      super(spec, file, snapshotId, null, writerProperties);
       this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -371,6 +403,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
             .meta("partition-spec-id", String.valueOf(spec.specId()))
             .meta("format-version", "3")
             .meta("content", "deletes")
+            .set(writerProperties())
             .overwrite()
             .build();
       } catch (IOException e) {
@@ -388,8 +421,12 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V2Writer extends ManifestWriter<DataFile> {
     private final V2Metadata.ManifestEntryWrapper<DataFile> entryWrapper;
 
-    V2Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
-      super(spec, file, snapshotId, null);
+    V2Writer(
+        PartitionSpec spec,
+        EncryptedOutputFile file,
+        Long snapshotId,
+        Map<String, String> writerProperties) {
+      super(spec, file, snapshotId, null, writerProperties);
       this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -411,6 +448,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
             .meta("partition-spec-id", String.valueOf(spec.specId()))
             .meta("format-version", "2")
             .meta("content", "data")
+            .set(writerProperties())
             .overwrite()
             .build();
       } catch (IOException e) {
@@ -423,8 +461,12 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
     private final V2Metadata.ManifestEntryWrapper<DeleteFile> entryWrapper;
 
-    V2DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long 
snapshotId) {
-      super(spec, file, snapshotId, null);
+    V2DeleteWriter(
+        PartitionSpec spec,
+        EncryptedOutputFile file,
+        Long snapshotId,
+        Map<String, String> writerProperties) {
+      super(spec, file, snapshotId, null, writerProperties);
       this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
     }
 
@@ -446,6 +488,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
             .meta("partition-spec-id", String.valueOf(spec.specId()))
             .meta("format-version", "2")
             .meta("content", "deletes")
+            .set(writerProperties())
             .overwrite()
             .build();
       } catch (IOException e) {
@@ -463,8 +506,12 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
   static class V1Writer extends ManifestWriter<DataFile> {
     private final V1Metadata.ManifestEntryWrapper entryWrapper;
 
-    V1Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
-      super(spec, file, snapshotId, null);
+    V1Writer(
+        PartitionSpec spec,
+        EncryptedOutputFile file,
+        Long snapshotId,
+        Map<String, String> writerProperties) {
+      super(spec, file, snapshotId, null, writerProperties);
       this.entryWrapper = new V1Metadata.ManifestEntryWrapper();
     }
 
@@ -485,6 +532,7 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
             .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
             .meta("partition-spec-id", String.valueOf(spec.specId()))
             .meta("format-version", "1")
+            .set(writerProperties())
             .overwrite()
             .build();
       } catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 1fd7dab62f..6ba10e8049 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -113,6 +113,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   private final AtomicInteger attempt = new AtomicInteger(0);
   private final List<String> manifestLists = Lists.newArrayList();
   private final long targetManifestSizeBytes;
+  private final Map<String, String> manifestWriterProps;
   private MetricsReporter reporter = LoggingMetricsReporter.instance();
   private volatile Long snapshotId = null;
   private TableMetadata base;
@@ -141,6 +142,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
     this.targetManifestSizeBytes =
         ops.current()
             .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, 
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
+    this.manifestWriterProps = manifestWriterProperties(ops.current());
     boolean snapshotIdInheritanceEnabled =
         ops.current()
             .propertyAsBoolean(
@@ -608,12 +610,20 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
 
   protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) {
     return ManifestFiles.write(
-        ops.current().formatVersion(), spec, newManifestOutputFile(), 
snapshotId());
+        ops.current().formatVersion(),
+        spec,
+        newManifestOutputFile(),
+        snapshotId(),
+        manifestWriterProps);
   }
 
   protected ManifestWriter<DeleteFile> newDeleteManifestWriter(PartitionSpec 
spec) {
     return ManifestFiles.writeDeleteManifest(
-        ops.current().formatVersion(), spec, newManifestOutputFile(), 
snapshotId());
+        ops.current().formatVersion(),
+        spec,
+        newManifestOutputFile(),
+        snapshotId(),
+        manifestWriterProps);
   }
 
   protected RollingManifestWriter<DataFile> 
newRollingManifestWriter(PartitionSpec spec) {
@@ -625,6 +635,25 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
         () -> newDeleteManifestWriter(spec), targetManifestSizeBytes);
   }
 
+  private static Map<String, String> manifestWriterProperties(TableMetadata 
metadata) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+
+    String codec =
+        metadata.property(
+            TableProperties.MANIFEST_COMPRESSION, 
TableProperties.MANIFEST_COMPRESSION_DEFAULT);
+    builder.put(TableProperties.AVRO_COMPRESSION, codec);
+
+    String level =
+        metadata.property(
+            TableProperties.MANIFEST_COMPRESSION_LEVEL,
+            TableProperties.MANIFEST_COMPRESSION_LEVEL_DEFAULT);
+    if (level != null) {
+      builder.put(TableProperties.AVRO_COMPRESSION_LEVEL, level);
+    }
+
+    return builder.build();
+  }
+
   protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
     return ManifestFiles.read(manifest, ops.io(), ops.current().specsById());
   }
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java 
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 32dc758ab0..1f778984af 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -191,6 +191,12 @@ public class TableProperties {
   public static final String DELETE_AVRO_COMPRESSION_LEVEL = 
"write.delete.avro.compression-level";
   public static final String AVRO_COMPRESSION_LEVEL_DEFAULT = null;
 
+  public static final String MANIFEST_COMPRESSION = 
"write.manifest.compression-codec";
+  public static final String MANIFEST_COMPRESSION_DEFAULT = "gzip";
+
+  public static final String MANIFEST_COMPRESSION_LEVEL = 
"write.manifest.compression-level";
+  public static final String MANIFEST_COMPRESSION_LEVEL_DEFAULT = null;
+
   public static final String ORC_STRIPE_SIZE_BYTES = 
"write.orc.stripe-size-bytes";
 
   public static final String ORC_BLOOM_FILTER_COLUMNS = 
"write.orc.bloom.filter.columns";
diff --git 
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java 
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index fdd9cc33a4..5e83827f0c 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -18,11 +18,13 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
@@ -313,6 +315,35 @@ public class TestManifestWriterVersions {
     checkRewrittenEntry(readManifest(manifest3), 0L, FileContent.DATA, 
FIRST_ROW_ID);
   }
 
+  @ParameterizedTest
+  @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+  public void testDefaultManifestCompression(int formatVersion) throws 
IOException {
+    File manifestFile = temp.resolve("default-v" + formatVersion + 
".avro").toFile();
+    OutputFile outputFile = Files.localOutput(manifestFile);
+
+    try (ManifestWriter<DataFile> writer =
+        ManifestFiles.write(formatVersion, SPEC, outputFile, SNAPSHOT_ID)) {
+      writer.add(DATA_FILE);
+    }
+
+    assertThat(readAvroCodec(manifestFile)).isEqualTo("deflate");
+  }
+
+  @ParameterizedTest
+  @FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
+  public void testCustomManifestCompression(int formatVersion) throws 
IOException {
+    Map<String, String> props = 
ImmutableMap.of(TableProperties.AVRO_COMPRESSION, "snappy");
+    File manifestFile = temp.resolve("snappy-v" + formatVersion + 
".avro").toFile();
+    OutputFile outputFile = Files.localOutput(manifestFile);
+
+    try (ManifestWriter<DataFile> writer =
+        ManifestFiles.write(formatVersion, SPEC, outputFile, SNAPSHOT_ID, 
props)) {
+      writer.add(DATA_FILE);
+    }
+
+    assertThat(readAvroCodec(manifestFile)).isEqualTo("snappy");
+  }
+
   void checkEntry(
       ManifestEntry<?> entry,
       Long expectedDataSequenceNumber,
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index b6c0ab65e2..dd97738759 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg;
 
 import static org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP;
+import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -224,4 +225,22 @@ public class TestSnapshotProducer extends TestBase {
       }
     };
   }
+
+  @TestTemplate
+  public void testDefaultManifestCompression() throws IOException {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    ManifestFile manifest = 
table.currentSnapshot().dataManifests(table.io()).get(0);
+    assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("deflate");
+  }
+
+  @TestTemplate
+  public void testManifestCompressionFromTableProperty() throws IOException {
+    table.updateProperties().set(TableProperties.MANIFEST_COMPRESSION, 
"snappy").commit();
+
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    ManifestFile manifest = 
table.currentSnapshot().dataManifests(table.io()).get(0);
+    assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("snappy");
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java 
b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
index 56efc4cd3e..0a1cf43f4f 100644
--- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
+++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
@@ -21,18 +21,23 @@ package org.apache.iceberg.avro;
 import static org.apache.iceberg.avro.AvroSchemaUtil.toOption;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableFileInput;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.variants.Variant;
 import org.apache.iceberg.variants.VariantTestUtil;
 
-class AvroTestHelpers {
+public class AvroTestHelpers {
 
   private AvroTestHelpers() {}
 
@@ -164,4 +169,12 @@ class AvroTestHelpers {
         throw new IllegalArgumentException("Not a supported type: " + type);
     }
   }
+
+  /** Reads the {@code avro.codec} metadata value from an Avro data file. */
+  public static String readAvroCodec(File file) throws IOException {
+    try (DataFileReader<?> reader =
+        new DataFileReader<>(new SeekableFileInput(file), new 
GenericDatumReader<>())) {
+      return reader.getMetaString("avro.codec");
+    }
+  }
 }


Reply via email to