This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ed03c646a Spark 3.4: Fix write and SQL options to override delete 
file compression config (#8438)
3ed03c646a is described below

commit 3ed03c646ada141944c217fd9bff3d9671f7a00c
Author: roryqi <[email protected]>
AuthorDate: Thu Sep 7 09:16:50 2023 +0800

    Spark 3.4: Fix write and SQL options to override delete file compression 
config (#8438)
---
 .../org/apache/iceberg/spark/SparkWriteConf.java   | 200 ++++++++++++---
 .../spark/source/SparkPositionDeletesRewrite.java  |   3 +-
 .../spark/source/SparkPositionDeltaWrite.java      |   2 +-
 .../apache/iceberg/spark/source/SparkWrite.java    |   2 +-
 .../apache/iceberg/spark/TestSparkWriteConf.java   | 273 +++++++++++++++++++++
 .../spark/source/TestCompressionSettings.java      |   6 +
 6 files changed, 447 insertions(+), 39 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index ec9825d40b..df3e2051f7 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -23,6 +23,12 @@ import static org.apache.iceberg.DistributionMode.NONE;
 import static org.apache.iceberg.DistributionMode.RANGE;
 import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
 import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
+import static 
org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
+import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
 import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
 import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
 import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
@@ -425,7 +431,107 @@ public class SparkWriteConf {
     return branch;
   }
 
-  public String parquetCompressionCodec() {
+  public Map<String, String> writeProperties() {
+    Map<String, String> writeProperties = Maps.newHashMap();
+    writeProperties.putAll(dataWriteProperties());
+    writeProperties.putAll(deleteWriteProperties());
+    return writeProperties;
+  }
+
+  private Map<String, String> dataWriteProperties() {
+    Map<String, String> writeProperties = Maps.newHashMap();
+    FileFormat dataFormat = dataFileFormat();
+
+    switch (dataFormat) {
+      case PARQUET:
+        writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
+        String parquetCompressionLevel = parquetCompressionLevel();
+        if (parquetCompressionLevel != null) {
+          writeProperties.put(PARQUET_COMPRESSION_LEVEL, 
parquetCompressionLevel);
+        }
+        break;
+
+      case AVRO:
+        writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
+        String avroCompressionLevel = avroCompressionLevel();
+        if (avroCompressionLevel != null) {
+          writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel);
+        }
+        break;
+
+      case ORC:
+        writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
+        writeProperties.put(ORC_COMPRESSION_STRATEGY, 
orcCompressionStrategy());
+        break;
+
+      default:
+        // skip
+    }
+
+    return writeProperties;
+  }
+
+  private Map<String, String> deleteWriteProperties() {
+    Map<String, String> writeProperties = Maps.newHashMap();
+    FileFormat deleteFormat = deleteFileFormat();
+
+    switch (deleteFormat) {
+      case PARQUET:
+        setWritePropertyWithFallback(
+            writeProperties,
+            DELETE_PARQUET_COMPRESSION,
+            deleteParquetCompressionCodec(),
+            parquetCompressionCodec());
+        setWritePropertyWithFallback(
+            writeProperties,
+            DELETE_PARQUET_COMPRESSION_LEVEL,
+            deleteParquetCompressionLevel(),
+            parquetCompressionLevel());
+        break;
+
+      case AVRO:
+        setWritePropertyWithFallback(
+            writeProperties,
+            DELETE_AVRO_COMPRESSION,
+            deleteAvroCompressionCodec(),
+            avroCompressionCodec());
+        setWritePropertyWithFallback(
+            writeProperties,
+            DELETE_AVRO_COMPRESSION_LEVEL,
+            deleteAvroCompressionLevel(),
+            avroCompressionLevel());
+        break;
+
+      case ORC:
+        setWritePropertyWithFallback(
+            writeProperties,
+            DELETE_ORC_COMPRESSION,
+            deleteOrcCompressionCodec(),
+            orcCompressionCodec());
+        setWritePropertyWithFallback(
+            writeProperties,
+            DELETE_ORC_COMPRESSION_STRATEGY,
+            deleteOrcCompressionStrategy(),
+            orcCompressionStrategy());
+        break;
+
+      default:
+        // skip
+    }
+
+    return writeProperties;
+  }
+
+  private void setWritePropertyWithFallback(
+      Map<String, String> writeProperties, String key, String value, String 
fallbackValue) {
+    if (value != null) {
+      writeProperties.put(key, value);
+    } else if (fallbackValue != null) {
+      writeProperties.put(key, fallbackValue);
+    }
+  }
+
+  private String parquetCompressionCodec() {
     return confParser
         .stringConf()
         .option(SparkWriteOptions.COMPRESSION_CODEC)
@@ -435,7 +541,16 @@ public class SparkWriteConf {
         .parse();
   }
 
-  public String parquetCompressionLevel() {
+  private String deleteParquetCompressionCodec() {
+    return confParser
+        .stringConf()
+        .option(SparkWriteOptions.COMPRESSION_CODEC)
+        .sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
+        .tableProperty(DELETE_PARQUET_COMPRESSION)
+        .parseOptional();
+  }
+
+  private String parquetCompressionLevel() {
     return confParser
         .stringConf()
         .option(SparkWriteOptions.COMPRESSION_LEVEL)
@@ -445,7 +560,16 @@ public class SparkWriteConf {
         .parseOptional();
   }
 
-  public String avroCompressionCodec() {
+  private String deleteParquetCompressionLevel() {
+    return confParser
+        .stringConf()
+        .option(SparkWriteOptions.COMPRESSION_LEVEL)
+        .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
+        .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL)
+        .parseOptional();
+  }
+
+  private String avroCompressionCodec() {
     return confParser
         .stringConf()
         .option(SparkWriteOptions.COMPRESSION_CODEC)
@@ -455,7 +579,16 @@ public class SparkWriteConf {
         .parse();
   }
 
-  public String avroCompressionLevel() {
+  private String deleteAvroCompressionCodec() {
+    return confParser
+        .stringConf()
+        .option(SparkWriteOptions.COMPRESSION_CODEC)
+        .sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
+        .tableProperty(DELETE_AVRO_COMPRESSION)
+        .parseOptional();
+  }
+
+  private String avroCompressionLevel() {
     return confParser
         .stringConf()
         .option(SparkWriteOptions.COMPRESSION_LEVEL)
@@ -465,7 +598,16 @@ public class SparkWriteConf {
         .parseOptional();
   }
 
-  public String orcCompressionCodec() {
+  private String deleteAvroCompressionLevel() {
+    return confParser
+        .stringConf()
+        .option(SparkWriteOptions.COMPRESSION_LEVEL)
+        .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
+        .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL)
+        .parseOptional();
+  }
+
+  private String orcCompressionCodec() {
     return confParser
         .stringConf()
         .option(SparkWriteOptions.COMPRESSION_CODEC)
@@ -475,7 +617,16 @@ public class SparkWriteConf {
         .parse();
   }
 
-  public String orcCompressionStrategy() {
+  private String deleteOrcCompressionCodec() {
+    return confParser
+        .stringConf()
+        .option(SparkWriteOptions.COMPRESSION_CODEC)
+        .sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
+        .tableProperty(DELETE_ORC_COMPRESSION)
+        .parseOptional();
+  }
+
+  private String orcCompressionStrategy() {
     return confParser
         .stringConf()
         .option(SparkWriteOptions.COMPRESSION_STRATEGY)
@@ -485,35 +636,12 @@ public class SparkWriteConf {
         .parse();
   }
 
-  public Map<String, String> writeProperties(FileFormat format) {
-    Map<String, String> writeProperties = Maps.newHashMap();
-
-    switch (format) {
-      case PARQUET:
-        writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
-        String parquetCompressionLevel = parquetCompressionLevel();
-        if (parquetCompressionLevel != null) {
-          writeProperties.put(PARQUET_COMPRESSION_LEVEL, 
parquetCompressionLevel);
-        }
-        break;
-
-      case AVRO:
-        writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
-        String avroCompressionLevel = avroCompressionLevel();
-        if (avroCompressionLevel != null) {
-          writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel());
-        }
-        break;
-
-      case ORC:
-        writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
-        writeProperties.put(ORC_COMPRESSION_STRATEGY, 
orcCompressionStrategy());
-        break;
-
-      default:
-        // skip
-    }
-
-    return writeProperties;
+  private String deleteOrcCompressionStrategy() {
+    return confParser
+        .stringConf()
+        .option(SparkWriteOptions.COMPRESSION_STRATEGY)
+        .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
+        .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
+        .parseOptional();
   }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index 152279daaf..d0769eaa5f 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -108,7 +108,7 @@ public class SparkPositionDeletesRewrite implements Write {
     this.fileSetId = writeConf.rewrittenFileSetId();
     this.specId = specId;
     this.partition = partition;
-    this.writeProperties = writeConf.writeProperties(format);
+    this.writeProperties = writeConf.writeProperties();
   }
 
   @Override
@@ -221,6 +221,7 @@ public class SparkPositionDeletesRewrite implements Write {
               .deleteFileFormat(format)
               .positionDeleteRowSchema(positionDeleteRowSchema)
               .positionDeleteSparkType(deleteSparkType)
+              .writeProperties(writeProperties)
               .build();
       SparkFileWriterFactory writerFactoryWithoutRow =
           SparkFileWriterFactory.builderFor(table)
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index b80d283685..9fea33948b 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -127,7 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
     this.writeRequirements = writeConf.positionDeltaRequirements(command);
     this.context = new Context(dataSchema, writeConf, info, writeRequirements);
-    this.writeProperties = writeConf.writeProperties(context.dataFileFormat);
+    this.writeProperties = writeConf.writeProperties();
   }
 
   @Override
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index d4a4f22bfd..15881098e7 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -129,7 +129,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
     this.writeRequirements = writeRequirements;
     this.outputSpecId = writeConf.outputSpecId();
-    this.writeProperties = writeConf.writeProperties(format);
+    this.writeProperties = writeConf.writeProperties();
   }
 
   @Override
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 1dcdc1fc65..a7c22b4468 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -18,21 +18,41 @@
  */
 package org.apache.iceberg.spark;
 
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
+import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
+import static 
org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
+import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
 import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
 import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
+import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_CODEC;
+import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_LEVEL;
 import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE;
 import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
 import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -164,6 +184,259 @@ public class TestSparkWriteConf extends 
SparkTestBaseWithCatalog {
         });
   }
 
+  @Test
+  public void testSparkConfOverride() {
+    List<List<Map<String, String>>> propertiesSuites =
+        Lists.newArrayList(
+            Lists.newArrayList(
+                ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, 
"3"),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "parquet",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "parquet",
+                    TableProperties.PARQUET_COMPRESSION,
+                    "gzip",
+                    TableProperties.DELETE_PARQUET_COMPRESSION,
+                    "snappy"),
+                ImmutableMap.of(
+                    DELETE_PARQUET_COMPRESSION,
+                    "zstd",
+                    PARQUET_COMPRESSION,
+                    "zstd",
+                    PARQUET_COMPRESSION_LEVEL,
+                    "3",
+                    DELETE_PARQUET_COMPRESSION_LEVEL,
+                    "3")),
+            Lists.newArrayList(
+                ImmutableMap.of(
+                    COMPRESSION_CODEC,
+                    "zstd",
+                    SparkSQLProperties.COMPRESSION_STRATEGY,
+                    "compression"),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "orc",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "orc",
+                    ORC_COMPRESSION,
+                    "zlib",
+                    DELETE_ORC_COMPRESSION,
+                    "snappy"),
+                ImmutableMap.of(
+                    DELETE_ORC_COMPRESSION,
+                    "zstd",
+                    ORC_COMPRESSION,
+                    "zstd",
+                    DELETE_ORC_COMPRESSION_STRATEGY,
+                    "compression",
+                    ORC_COMPRESSION_STRATEGY,
+                    "compression")),
+            Lists.newArrayList(
+                ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, 
"9"),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "avro",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "avro",
+                    AVRO_COMPRESSION,
+                    "gzip",
+                    DELETE_AVRO_COMPRESSION,
+                    "snappy"),
+                ImmutableMap.of(
+                    DELETE_AVRO_COMPRESSION,
+                    "zstd",
+                    AVRO_COMPRESSION,
+                    "zstd",
+                    AVRO_COMPRESSION_LEVEL,
+                    "9",
+                    DELETE_AVRO_COMPRESSION_LEVEL,
+                    "9")));
+    for (List<Map<String, String>> propertiesSuite : propertiesSuites) {
+      testWriteProperties(propertiesSuite);
+    }
+  }
+
+  @Test
+  public void testDataPropsDefaultsAsDeleteProps() {
+    List<List<Map<String, String>>> propertiesSuites =
+        Lists.newArrayList(
+            Lists.newArrayList(
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "parquet",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "parquet",
+                    PARQUET_COMPRESSION,
+                    "zstd",
+                    PARQUET_COMPRESSION_LEVEL,
+                    "5"),
+                ImmutableMap.of(
+                    DELETE_PARQUET_COMPRESSION,
+                    "zstd",
+                    PARQUET_COMPRESSION,
+                    "zstd",
+                    PARQUET_COMPRESSION_LEVEL,
+                    "5",
+                    DELETE_PARQUET_COMPRESSION_LEVEL,
+                    "5")),
+            Lists.newArrayList(
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "orc",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "orc",
+                    ORC_COMPRESSION,
+                    "snappy",
+                    ORC_COMPRESSION_STRATEGY,
+                    "speed"),
+                ImmutableMap.of(
+                    DELETE_ORC_COMPRESSION,
+                    "snappy",
+                    ORC_COMPRESSION,
+                    "snappy",
+                    ORC_COMPRESSION_STRATEGY,
+                    "speed",
+                    DELETE_ORC_COMPRESSION_STRATEGY,
+                    "speed")),
+            Lists.newArrayList(
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "avro",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "avro",
+                    AVRO_COMPRESSION,
+                    "snappy",
+                    AVRO_COMPRESSION_LEVEL,
+                    "9"),
+                ImmutableMap.of(
+                    DELETE_AVRO_COMPRESSION,
+                    "snappy",
+                    AVRO_COMPRESSION,
+                    "snappy",
+                    AVRO_COMPRESSION_LEVEL,
+                    "9",
+                    DELETE_AVRO_COMPRESSION_LEVEL,
+                    "9")));
+    for (List<Map<String, String>> propertiesSuite : propertiesSuites) {
+      testWriteProperties(propertiesSuite);
+    }
+  }
+
+  @Test
+  public void testDeleteFileWriteConf() {
+    List<List<Map<String, String>>> propertiesSuites =
+        Lists.newArrayList(
+            Lists.newArrayList(
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "parquet",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "parquet",
+                    TableProperties.PARQUET_COMPRESSION,
+                    "zstd",
+                    PARQUET_COMPRESSION_LEVEL,
+                    "5",
+                    DELETE_PARQUET_COMPRESSION_LEVEL,
+                    "6"),
+                ImmutableMap.of(
+                    DELETE_PARQUET_COMPRESSION,
+                    "zstd",
+                    PARQUET_COMPRESSION,
+                    "zstd",
+                    PARQUET_COMPRESSION_LEVEL,
+                    "5",
+                    DELETE_PARQUET_COMPRESSION_LEVEL,
+                    "6")),
+            Lists.newArrayList(
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "orc",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "orc",
+                    ORC_COMPRESSION,
+                    "snappy",
+                    ORC_COMPRESSION_STRATEGY,
+                    "speed",
+                    DELETE_ORC_COMPRESSION,
+                    "zstd",
+                    DELETE_ORC_COMPRESSION_STRATEGY,
+                    "compression"),
+                ImmutableMap.of(
+                    DELETE_ORC_COMPRESSION,
+                    "zstd",
+                    ORC_COMPRESSION,
+                    "snappy",
+                    ORC_COMPRESSION_STRATEGY,
+                    "speed",
+                    DELETE_ORC_COMPRESSION_STRATEGY,
+                    "compression")),
+            Lists.newArrayList(
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                    DEFAULT_FILE_FORMAT,
+                    "avro",
+                    DELETE_DEFAULT_FILE_FORMAT,
+                    "avro",
+                    AVRO_COMPRESSION,
+                    "snappy",
+                    AVRO_COMPRESSION_LEVEL,
+                    "9",
+                    DELETE_AVRO_COMPRESSION,
+                    "zstd",
+                    DELETE_AVRO_COMPRESSION_LEVEL,
+                    "16"),
+                ImmutableMap.of(
+                    DELETE_AVRO_COMPRESSION,
+                    "zstd",
+                    AVRO_COMPRESSION,
+                    "snappy",
+                    AVRO_COMPRESSION_LEVEL,
+                    "9",
+                    DELETE_AVRO_COMPRESSION_LEVEL,
+                    "16")));
+    for (List<Map<String, String>> propertiesSuite : propertiesSuites) {
+      testWriteProperties(propertiesSuite);
+    }
+  }
+
+  private void testWriteProperties(List<Map<String, String>> propertiesSuite) {
+    withSQLConf(
+        propertiesSuite.get(0),
+        () -> {
+          Table table = validationCatalog.loadTable(tableIdent);
+          Map<String, String> tableProperties = propertiesSuite.get(1);
+          UpdateProperties updateProperties = table.updateProperties();
+          for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
+            updateProperties.set(entry.getKey(), entry.getValue());
+          }
+
+          updateProperties.commit();
+
+          Map<String, String> writeOptions = ImmutableMap.of();
+          SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
writeOptions);
+          Map<String, String> writeProperties = writeConf.writeProperties();
+          Map<String, String> expectedProperties = propertiesSuite.get(2);
+          Assert.assertEquals(expectedProperties.size(), 
writeConf.writeProperties().size());
+          for (Map.Entry<String, String> entry : writeProperties.entrySet()) {
+            Assert.assertEquals(entry.getValue(), 
expectedProperties.get(entry.getKey()));
+          }
+
+          table.refresh();
+          updateProperties = table.updateProperties();
+          for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
+            updateProperties.remove(entry.getKey());
+          }
+
+          updateProperties.commit();
+        });
+  }
+
   private void checkMode(DistributionMode expectedMode, SparkWriteConf 
writeConf) {
     Assert.assertEquals(expectedMode, writeConf.distributionMode());
     Assert.assertEquals(expectedMode, 
writeConf.copyOnWriteDistributionMode(DELETE));
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
index cc3aa9121b..760e735ace 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
@@ -22,8 +22,11 @@ import static org.apache.iceberg.FileFormat.PARQUET;
 import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
 import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
 import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DELETE_MODE;
+import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
 import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
 import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
 import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
@@ -132,6 +135,9 @@ public class TestCompressionSettings extends 
SparkCatalogTestBase {
     tableProperties.put(PARQUET_COMPRESSION, "gzip");
     tableProperties.put(AVRO_COMPRESSION, "gzip");
     tableProperties.put(ORC_COMPRESSION, "zlib");
+    tableProperties.put(DELETE_PARQUET_COMPRESSION, "gzip");
+    tableProperties.put(DELETE_AVRO_COMPRESSION, "gzip");
+    tableProperties.put(DELETE_ORC_COMPRESSION, "zlib");
     tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName());
     tableProperties.put(FORMAT_VERSION, "2");
     sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, 
DEFAULT_FILE_FORMAT, format);

Reply via email to