This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3ed03c646a Spark 3.4: Fix write and SQL options to override delete
file compression config (#8438)
3ed03c646a is described below
commit 3ed03c646ada141944c217fd9bff3d9671f7a00c
Author: roryqi <[email protected]>
AuthorDate: Thu Sep 7 09:16:50 2023 +0800
Spark 3.4: Fix write and SQL options to override delete file compression
config (#8438)
---
.../org/apache/iceberg/spark/SparkWriteConf.java | 200 ++++++++++++---
.../spark/source/SparkPositionDeletesRewrite.java | 3 +-
.../spark/source/SparkPositionDeltaWrite.java | 2 +-
.../apache/iceberg/spark/source/SparkWrite.java | 2 +-
.../apache/iceberg/spark/TestSparkWriteConf.java | 273 +++++++++++++++++++++
.../spark/source/TestCompressionSettings.java | 6 +
6 files changed, 447 insertions(+), 39 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index ec9825d40b..df3e2051f7 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -23,6 +23,12 @@ import static org.apache.iceberg.DistributionMode.NONE;
import static org.apache.iceberg.DistributionMode.RANGE;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
+import static
org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
+import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
@@ -425,7 +431,107 @@ public class SparkWriteConf {
return branch;
}
- public String parquetCompressionCodec() {
+ public Map<String, String> writeProperties() {
+ Map<String, String> writeProperties = Maps.newHashMap();
+ writeProperties.putAll(dataWriteProperties());
+ writeProperties.putAll(deleteWriteProperties());
+ return writeProperties;
+ }
+
+ private Map<String, String> dataWriteProperties() {
+ Map<String, String> writeProperties = Maps.newHashMap();
+ FileFormat dataFormat = dataFileFormat();
+
+ switch (dataFormat) {
+ case PARQUET:
+ writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
+ String parquetCompressionLevel = parquetCompressionLevel();
+ if (parquetCompressionLevel != null) {
+ writeProperties.put(PARQUET_COMPRESSION_LEVEL,
parquetCompressionLevel);
+ }
+ break;
+
+ case AVRO:
+ writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
+ String avroCompressionLevel = avroCompressionLevel();
+ if (avroCompressionLevel != null) {
+ writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel);
+ }
+ break;
+
+ case ORC:
+ writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
+ writeProperties.put(ORC_COMPRESSION_STRATEGY,
orcCompressionStrategy());
+ break;
+
+ default:
+ // skip
+ }
+
+ return writeProperties;
+ }
+
+ private Map<String, String> deleteWriteProperties() {
+ Map<String, String> writeProperties = Maps.newHashMap();
+ FileFormat deleteFormat = deleteFileFormat();
+
+ switch (deleteFormat) {
+ case PARQUET:
+ setWritePropertyWithFallback(
+ writeProperties,
+ DELETE_PARQUET_COMPRESSION,
+ deleteParquetCompressionCodec(),
+ parquetCompressionCodec());
+ setWritePropertyWithFallback(
+ writeProperties,
+ DELETE_PARQUET_COMPRESSION_LEVEL,
+ deleteParquetCompressionLevel(),
+ parquetCompressionLevel());
+ break;
+
+ case AVRO:
+ setWritePropertyWithFallback(
+ writeProperties,
+ DELETE_AVRO_COMPRESSION,
+ deleteAvroCompressionCodec(),
+ avroCompressionCodec());
+ setWritePropertyWithFallback(
+ writeProperties,
+ DELETE_AVRO_COMPRESSION_LEVEL,
+ deleteAvroCompressionLevel(),
+ avroCompressionLevel());
+ break;
+
+ case ORC:
+ setWritePropertyWithFallback(
+ writeProperties,
+ DELETE_ORC_COMPRESSION,
+ deleteOrcCompressionCodec(),
+ orcCompressionCodec());
+ setWritePropertyWithFallback(
+ writeProperties,
+ DELETE_ORC_COMPRESSION_STRATEGY,
+ deleteOrcCompressionStrategy(),
+ orcCompressionStrategy());
+ break;
+
+ default:
+ // skip
+ }
+
+ return writeProperties;
+ }
+
+ private void setWritePropertyWithFallback(
+ Map<String, String> writeProperties, String key, String value, String
fallbackValue) {
+ if (value != null) {
+ writeProperties.put(key, value);
+ } else if (fallbackValue != null) {
+ writeProperties.put(key, fallbackValue);
+ }
+ }
+
+ private String parquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
@@ -435,7 +541,16 @@ public class SparkWriteConf {
.parse();
}
- public String parquetCompressionLevel() {
+ private String deleteParquetCompressionCodec() {
+ return confParser
+ .stringConf()
+ .option(SparkWriteOptions.COMPRESSION_CODEC)
+ .sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
+ .tableProperty(DELETE_PARQUET_COMPRESSION)
+ .parseOptional();
+ }
+
+ private String parquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
@@ -445,7 +560,16 @@ public class SparkWriteConf {
.parseOptional();
}
- public String avroCompressionCodec() {
+ private String deleteParquetCompressionLevel() {
+ return confParser
+ .stringConf()
+ .option(SparkWriteOptions.COMPRESSION_LEVEL)
+ .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
+ .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL)
+ .parseOptional();
+ }
+
+ private String avroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
@@ -455,7 +579,16 @@ public class SparkWriteConf {
.parse();
}
- public String avroCompressionLevel() {
+ private String deleteAvroCompressionCodec() {
+ return confParser
+ .stringConf()
+ .option(SparkWriteOptions.COMPRESSION_CODEC)
+ .sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
+ .tableProperty(DELETE_AVRO_COMPRESSION)
+ .parseOptional();
+ }
+
+ private String avroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
@@ -465,7 +598,16 @@ public class SparkWriteConf {
.parseOptional();
}
- public String orcCompressionCodec() {
+ private String deleteAvroCompressionLevel() {
+ return confParser
+ .stringConf()
+ .option(SparkWriteOptions.COMPRESSION_LEVEL)
+ .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
+ .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL)
+ .parseOptional();
+ }
+
+ private String orcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
@@ -475,7 +617,16 @@ public class SparkWriteConf {
.parse();
}
- public String orcCompressionStrategy() {
+ private String deleteOrcCompressionCodec() {
+ return confParser
+ .stringConf()
+ .option(SparkWriteOptions.COMPRESSION_CODEC)
+ .sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
+ .tableProperty(DELETE_ORC_COMPRESSION)
+ .parseOptional();
+ }
+
+ private String orcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
@@ -485,35 +636,12 @@ public class SparkWriteConf {
.parse();
}
- public Map<String, String> writeProperties(FileFormat format) {
- Map<String, String> writeProperties = Maps.newHashMap();
-
- switch (format) {
- case PARQUET:
- writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
- String parquetCompressionLevel = parquetCompressionLevel();
- if (parquetCompressionLevel != null) {
- writeProperties.put(PARQUET_COMPRESSION_LEVEL,
parquetCompressionLevel);
- }
- break;
-
- case AVRO:
- writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
- String avroCompressionLevel = avroCompressionLevel();
- if (avroCompressionLevel != null) {
- writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel());
- }
- break;
-
- case ORC:
- writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
- writeProperties.put(ORC_COMPRESSION_STRATEGY,
orcCompressionStrategy());
- break;
-
- default:
- // skip
- }
-
- return writeProperties;
+ private String deleteOrcCompressionStrategy() {
+ return confParser
+ .stringConf()
+ .option(SparkWriteOptions.COMPRESSION_STRATEGY)
+ .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
+ .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
+ .parseOptional();
}
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index 152279daaf..d0769eaa5f 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -108,7 +108,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.fileSetId = writeConf.rewrittenFileSetId();
this.specId = specId;
this.partition = partition;
- this.writeProperties = writeConf.writeProperties(format);
+ this.writeProperties = writeConf.writeProperties();
}
@Override
@@ -221,6 +221,7 @@ public class SparkPositionDeletesRewrite implements Write {
.deleteFileFormat(format)
.positionDeleteRowSchema(positionDeleteRowSchema)
.positionDeleteSparkType(deleteSparkType)
+ .writeProperties(writeProperties)
.build();
SparkFileWriterFactory writerFactoryWithoutRow =
SparkFileWriterFactory.builderFor(table)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index b80d283685..9fea33948b 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -127,7 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.writeRequirements = writeConf.positionDeltaRequirements(command);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
- this.writeProperties = writeConf.writeProperties(context.dataFileFormat);
+ this.writeProperties = writeConf.writeProperties();
}
@Override
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index d4a4f22bfd..15881098e7 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -129,7 +129,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
this.writeRequirements = writeRequirements;
this.outputSpecId = writeConf.outputSpecId();
- this.writeProperties = writeConf.writeProperties(format);
+ this.writeProperties = writeConf.writeProperties();
}
@Override
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 1dcdc1fc65..a7c22b4468 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -18,21 +18,41 @@
*/
package org.apache.iceberg.spark;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
+import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
+import static
org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
+import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE;
+import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_CODEC;
+import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_LEVEL;
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE;
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
+import java.util.List;
import java.util.Map;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -164,6 +184,259 @@ public class TestSparkWriteConf extends
SparkTestBaseWithCatalog {
});
}
+ @Test
+ public void testSparkConfOverride() {
+ List<List<Map<String, String>>> propertiesSuites =
+ Lists.newArrayList(
+ Lists.newArrayList(
+ ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL,
"3"),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "parquet",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "parquet",
+ TableProperties.PARQUET_COMPRESSION,
+ "gzip",
+ TableProperties.DELETE_PARQUET_COMPRESSION,
+ "snappy"),
+ ImmutableMap.of(
+ DELETE_PARQUET_COMPRESSION,
+ "zstd",
+ PARQUET_COMPRESSION,
+ "zstd",
+ PARQUET_COMPRESSION_LEVEL,
+ "3",
+ DELETE_PARQUET_COMPRESSION_LEVEL,
+ "3")),
+ Lists.newArrayList(
+ ImmutableMap.of(
+ COMPRESSION_CODEC,
+ "zstd",
+ SparkSQLProperties.COMPRESSION_STRATEGY,
+ "compression"),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "orc",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "orc",
+ ORC_COMPRESSION,
+ "zlib",
+ DELETE_ORC_COMPRESSION,
+ "snappy"),
+ ImmutableMap.of(
+ DELETE_ORC_COMPRESSION,
+ "zstd",
+ ORC_COMPRESSION,
+ "zstd",
+ DELETE_ORC_COMPRESSION_STRATEGY,
+ "compression",
+ ORC_COMPRESSION_STRATEGY,
+ "compression")),
+ Lists.newArrayList(
+ ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL,
"9"),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "avro",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "avro",
+ AVRO_COMPRESSION,
+ "gzip",
+ DELETE_AVRO_COMPRESSION,
+ "snappy"),
+ ImmutableMap.of(
+ DELETE_AVRO_COMPRESSION,
+ "zstd",
+ AVRO_COMPRESSION,
+ "zstd",
+ AVRO_COMPRESSION_LEVEL,
+ "9",
+ DELETE_AVRO_COMPRESSION_LEVEL,
+ "9")));
+ for (List<Map<String, String>> propertiesSuite : propertiesSuites) {
+ testWriteProperties(propertiesSuite);
+ }
+ }
+
+ @Test
+ public void testDataPropsDefaultsAsDeleteProps() {
+ List<List<Map<String, String>>> propertiesSuites =
+ Lists.newArrayList(
+ Lists.newArrayList(
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "parquet",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "parquet",
+ PARQUET_COMPRESSION,
+ "zstd",
+ PARQUET_COMPRESSION_LEVEL,
+ "5"),
+ ImmutableMap.of(
+ DELETE_PARQUET_COMPRESSION,
+ "zstd",
+ PARQUET_COMPRESSION,
+ "zstd",
+ PARQUET_COMPRESSION_LEVEL,
+ "5",
+ DELETE_PARQUET_COMPRESSION_LEVEL,
+ "5")),
+ Lists.newArrayList(
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "orc",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "orc",
+ ORC_COMPRESSION,
+ "snappy",
+ ORC_COMPRESSION_STRATEGY,
+ "speed"),
+ ImmutableMap.of(
+ DELETE_ORC_COMPRESSION,
+ "snappy",
+ ORC_COMPRESSION,
+ "snappy",
+ ORC_COMPRESSION_STRATEGY,
+ "speed",
+ DELETE_ORC_COMPRESSION_STRATEGY,
+ "speed")),
+ Lists.newArrayList(
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "avro",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "avro",
+ AVRO_COMPRESSION,
+ "snappy",
+ AVRO_COMPRESSION_LEVEL,
+ "9"),
+ ImmutableMap.of(
+ DELETE_AVRO_COMPRESSION,
+ "snappy",
+ AVRO_COMPRESSION,
+ "snappy",
+ AVRO_COMPRESSION_LEVEL,
+ "9",
+ DELETE_AVRO_COMPRESSION_LEVEL,
+ "9")));
+ for (List<Map<String, String>> propertiesSuite : propertiesSuites) {
+ testWriteProperties(propertiesSuite);
+ }
+ }
+
+ @Test
+ public void testDeleteFileWriteConf() {
+ List<List<Map<String, String>>> propertiesSuites =
+ Lists.newArrayList(
+ Lists.newArrayList(
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "parquet",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "parquet",
+ TableProperties.PARQUET_COMPRESSION,
+ "zstd",
+ PARQUET_COMPRESSION_LEVEL,
+ "5",
+ DELETE_PARQUET_COMPRESSION_LEVEL,
+ "6"),
+ ImmutableMap.of(
+ DELETE_PARQUET_COMPRESSION,
+ "zstd",
+ PARQUET_COMPRESSION,
+ "zstd",
+ PARQUET_COMPRESSION_LEVEL,
+ "5",
+ DELETE_PARQUET_COMPRESSION_LEVEL,
+ "6")),
+ Lists.newArrayList(
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "orc",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "orc",
+ ORC_COMPRESSION,
+ "snappy",
+ ORC_COMPRESSION_STRATEGY,
+ "speed",
+ DELETE_ORC_COMPRESSION,
+ "zstd",
+ DELETE_ORC_COMPRESSION_STRATEGY,
+ "compression"),
+ ImmutableMap.of(
+ DELETE_ORC_COMPRESSION,
+ "zstd",
+ ORC_COMPRESSION,
+ "snappy",
+ ORC_COMPRESSION_STRATEGY,
+ "speed",
+ DELETE_ORC_COMPRESSION_STRATEGY,
+ "compression")),
+ Lists.newArrayList(
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ DEFAULT_FILE_FORMAT,
+ "avro",
+ DELETE_DEFAULT_FILE_FORMAT,
+ "avro",
+ AVRO_COMPRESSION,
+ "snappy",
+ AVRO_COMPRESSION_LEVEL,
+ "9",
+ DELETE_AVRO_COMPRESSION,
+ "zstd",
+ DELETE_AVRO_COMPRESSION_LEVEL,
+ "16"),
+ ImmutableMap.of(
+ DELETE_AVRO_COMPRESSION,
+ "zstd",
+ AVRO_COMPRESSION,
+ "snappy",
+ AVRO_COMPRESSION_LEVEL,
+ "9",
+ DELETE_AVRO_COMPRESSION_LEVEL,
+ "16")));
+ for (List<Map<String, String>> propertiesSuite : propertiesSuites) {
+ testWriteProperties(propertiesSuite);
+ }
+ }
+
+ private void testWriteProperties(List<Map<String, String>> propertiesSuite) {
+ withSQLConf(
+ propertiesSuite.get(0),
+ () -> {
+ Table table = validationCatalog.loadTable(tableIdent);
+ Map<String, String> tableProperties = propertiesSuite.get(1);
+ UpdateProperties updateProperties = table.updateProperties();
+ for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
+ updateProperties.set(entry.getKey(), entry.getValue());
+ }
+
+ updateProperties.commit();
+
+ Map<String, String> writeOptions = ImmutableMap.of();
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table,
writeOptions);
+ Map<String, String> writeProperties = writeConf.writeProperties();
+ Map<String, String> expectedProperties = propertiesSuite.get(2);
+ Assert.assertEquals(expectedProperties.size(),
writeConf.writeProperties().size());
+ for (Map.Entry<String, String> entry : writeProperties.entrySet()) {
+ Assert.assertEquals(entry.getValue(),
expectedProperties.get(entry.getKey()));
+ }
+
+ table.refresh();
+ updateProperties = table.updateProperties();
+ for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
+ updateProperties.remove(entry.getKey());
+ }
+
+ updateProperties.commit();
+ });
+ }
+
private void checkMode(DistributionMode expectedMode, SparkWriteConf
writeConf) {
Assert.assertEquals(expectedMode, writeConf.distributionMode());
Assert.assertEquals(expectedMode,
writeConf.copyOnWriteDistributionMode(DELETE));
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
index cc3aa9121b..760e735ace 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
@@ -22,8 +22,11 @@ import static org.apache.iceberg.FileFormat.PARQUET;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
+import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
@@ -132,6 +135,9 @@ public class TestCompressionSettings extends
SparkCatalogTestBase {
tableProperties.put(PARQUET_COMPRESSION, "gzip");
tableProperties.put(AVRO_COMPRESSION, "gzip");
tableProperties.put(ORC_COMPRESSION, "zlib");
+ tableProperties.put(DELETE_PARQUET_COMPRESSION, "gzip");
+ tableProperties.put(DELETE_AVRO_COMPRESSION, "gzip");
+ tableProperties.put(DELETE_ORC_COMPRESSION, "zlib");
tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName());
tableProperties.put(FORMAT_VERSION, "2");
sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName,
DEFAULT_FILE_FORMAT, format);