This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new faab813678 Core, Spark 3.4:  Write properties of PositionDeletesTable 
should respect ones of BaseTable (#8428)
faab813678 is described below

commit faab8136788234f9217ac98bcec5c7acbd584625
Author: roryqi <[email protected]>
AuthorDate: Tue Sep 19 10:55:25 2023 +0800

    Core, Spark 3.4:  Write properties of PositionDeletesTable should respect 
ones of BaseTable (#8428)
---
 .../org/apache/iceberg/PositionDeletesTable.java   | 12 ++++++++++
 .../spark/source/TestCompressionSettings.java      | 26 ++++++++++------------
 2 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java 
b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index f8cb924e53..dd01c789c8 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -21,9 +21,11 @@ package org.apache.iceberg;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
@@ -93,6 +95,16 @@ public class PositionDeletesTable extends BaseMetadataTable {
     return specs;
   }
 
+  @Override
+  public Map<String, String> properties() {
+    // The write properties are needed by PositionDeletesRewriteAction,
+    // these properties should respect the ones of BaseTable.
+    return Collections.unmodifiableMap(
+        table().properties().entrySet().stream()
+            .filter(entry -> entry.getKey().startsWith("write."))
+            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+  }
+
   private Schema calculateSchema() {
     Types.StructType partitionType = Partitioning.partitionType(table());
     Schema result =
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
index 760e735ace..1ec3c4726d 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
@@ -188,20 +188,18 @@ public class TestCompressionSettings extends 
SparkCatalogTestBase {
           .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
     }
 
-    if (PARQUET.equals(format)) {
-      SparkActions.get(spark)
-          .rewritePositionDeletes(table)
-          .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
-          .execute();
-      table.refresh();
-      deleteManifestFiles = 
table.currentSnapshot().deleteManifests(table.io());
-      try (ManifestReader<DeleteFile> reader =
-          ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), 
table.io(), specMap)) {
-        DeleteFile file = reader.iterator().next();
-        InputFile inputFile = table.io().newInputFile(file.path().toString());
-        Assertions.assertThat(getCompressionType(inputFile))
-            .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
-      }
+    SparkActions.get(spark)
+        .rewritePositionDeletes(table)
+        .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+        .execute();
+    table.refresh();
+    deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io());
+    try (ManifestReader<DeleteFile> reader =
+        ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0), 
table.io(), specMap)) {
+      DeleteFile file = reader.iterator().next();
+      InputFile inputFile = table.io().newInputFile(file.path().toString());
+      Assertions.assertThat(getCompressionType(inputFile))
+          .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
     }
   }
 

Reply via email to