This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new faab813678 Core, Spark 3.4: Write properties of PositionDeletesTable
should respect ones of BaseTable (#8428)
faab813678 is described below
commit faab8136788234f9217ac98bcec5c7acbd584625
Author: roryqi <[email protected]>
AuthorDate: Tue Sep 19 10:55:25 2023 +0800
Core, Spark 3.4: Write properties of PositionDeletesTable should respect
ones of BaseTable (#8428)
---
.../org/apache/iceberg/PositionDeletesTable.java | 12 ++++++++++
.../spark/source/TestCompressionSettings.java | 26 ++++++++++------------
2 files changed, 24 insertions(+), 14 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index f8cb924e53..dd01c789c8 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -21,9 +21,11 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
@@ -93,6 +95,16 @@ public class PositionDeletesTable extends BaseMetadataTable {
return specs;
}
+ @Override
+ public Map<String, String> properties() {
+ // The write properties are needed by PositionDeletesRewriteAction,
+ // these properties should respect the ones of BaseTable.
+ return Collections.unmodifiableMap(
+ table().properties().entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith("write."))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ }
+
private Schema calculateSchema() {
Types.StructType partitionType = Partitioning.partitionType(table());
Schema result =
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
index 760e735ace..1ec3c4726d 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
@@ -188,20 +188,18 @@ public class TestCompressionSettings extends
SparkCatalogTestBase {
.isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
}
- if (PARQUET.equals(format)) {
- SparkActions.get(spark)
- .rewritePositionDeletes(table)
- .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
- .execute();
- table.refresh();
- deleteManifestFiles =
table.currentSnapshot().deleteManifests(table.io());
- try (ManifestReader<DeleteFile> reader =
- ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0),
table.io(), specMap)) {
- DeleteFile file = reader.iterator().next();
- InputFile inputFile = table.io().newInputFile(file.path().toString());
- Assertions.assertThat(getCompressionType(inputFile))
- .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
- }
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .execute();
+ table.refresh();
+ deleteManifestFiles = table.currentSnapshot().deleteManifests(table.io());
+ try (ManifestReader<DeleteFile> reader =
+ ManifestFiles.readDeleteManifest(deleteManifestFiles.get(0),
table.io(), specMap)) {
+ DeleteFile file = reader.iterator().next();
+ InputFile inputFile = table.io().newInputFile(file.path().toString());
+ Assertions.assertThat(getCompressionType(inputFile))
+ .isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC));
}
}