This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c8e140d640 Spark 3.4: Remove 'snapshot-property' prefix in 
CommitMetadata properties (#7986)
c8e140d640 is described below

commit c8e140d640564d1b02525c53be23c3628c2a5249
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Jul 5 14:27:09 2023 +0200

    Spark 3.4: Remove 'snapshot-property' prefix in CommitMetadata properties 
(#7986)
---
 docs/spark-configuration.md                        |  4 +--
 .../org/apache/iceberg/spark/CommitMetadata.java   | 12 ++++++--
 .../spark/source/TestDataSourceOptions.java        | 34 +++++++++++++++++-----
 3 files changed, 38 insertions(+), 12 deletions(-)

diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md
index 8d02246904..3a91f21176 100644
--- a/docs/spark-configuration.md
+++ b/docs/spark-configuration.md
@@ -189,13 +189,13 @@ df.write
 | write-format           | Table write.format.default | File format to use for 
this write operation; parquet, avro, or orc |
 | target-file-size-bytes | As per table property      | Overrides this table's 
write.target-file-size-bytes          |
 | check-nullability      | true                       | Sets the nullable 
check on fields                            |
-| snapshot-property._custom-key_    | null            | Adds an entry with 
custom-key and corresponding value in the snapshot summary  |
+| snapshot-property._custom-key_    | null            | Adds an entry with 
custom-key and corresponding value in the snapshot summary (the 
`snapshot-property.` prefix is only required for DSv2)  |
 | fanout-enabled       | false        | Overrides this table's 
write.spark.fanout.enabled  |
 | check-ordering       | true        | Checks if input schema and table schema 
are same  |
 | isolation-level | null | Desired isolation level for Dataframe overwrite 
operations.  `null` => no checks (for idempotent writes), `serializable` => 
check for concurrent inserts or deletes in destination partitions, `snapshot` 
=> checks for concurrent deletes in destination partitions. |
 | validate-from-snapshot-id | null | If isolation level is set, id of base 
snapshot from which to check concurrent write conflicts into a table. Should be 
the snapshot before any reads from the table. Can be obtained via [Table 
API](../../api#table-metadata) or [Snapshots 
table](../spark-queries#snapshots). If null, the table's oldest known snapshot 
is used. |
 
-CommitMetadata provides an interface to add custom metadata to a snapshot 
summary during a SQL execution, which can be beneficial for purposes such as 
auditing or change tracking. Here is an example:
+CommitMetadata provides an interface to add custom metadata to a snapshot 
summary during a SQL execution, which can be beneficial for purposes such as 
auditing or change tracking. If properties start with `snapshot-property.`, 
then that prefix will be removed from each property. Here is an example:
 
 ```java
 import org.apache.iceberg.spark.CommitMetadata;
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
index 641b957d11..ea400a7792 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark;
 
 import java.util.Map;
 import java.util.concurrent.Callable;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ExceptionUtil;
 
 /** utility class to accept thread local commit properties */
@@ -35,13 +37,19 @@ public class CommitMetadata {
    * running the code wrapped as a caller, and any snapshot committed within 
the callable object
    * will be attached with the metadata defined in properties
    *
-   * @param properties extra commit metadata to attach to the snapshot 
committed within callable
+   * @param properties extra commit metadata to attach to the snapshot 
committed within callable.
+   *     The prefix will be removed for properties starting with {@link
+   *     SnapshotSummary#EXTRA_METADATA_PREFIX}
    * @param callable the code to be executed
    * @param exClass the expected type of exception which would be thrown from 
callable
    */
   public static <R, E extends Exception> R withCommitProperties(
       Map<String, String> properties, Callable<R> callable, Class<E> exClass) 
throws E {
-    COMMIT_PROPERTIES.set(properties);
+    Map<String, String> props = Maps.newHashMap();
+    properties.forEach(
+        (k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX, 
""), v));
+
+    COMMIT_PROPERTIES.set(props);
     try {
       return callable.call();
     } catch (Throwable e) {
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 5e819200f5..83d8953735 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -32,10 +32,12 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
@@ -425,8 +427,14 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     Thread writerThread =
         new Thread(
             () -> {
-              Map<String, String> properties = Maps.newHashMap();
-              properties.put("writer-thread", 
String.valueOf(Thread.currentThread().getName()));
+              Map<String, String> properties =
+                  ImmutableMap.of(
+                      "writer-thread",
+                      String.valueOf(Thread.currentThread().getName()),
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+                      "someValue",
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+                      "anotherValue");
               CommitMetadata.withCommitProperties(
                   properties,
                   () -> {
@@ -442,8 +450,10 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
     Assert.assertEquals(2, snapshots.size());
     Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
-    Assert.assertEquals(
-        "test-extra-commit-message-writer-thread", 
snapshots.get(1).summary().get("writer-thread"));
+    Assertions.assertThat(snapshots.get(1).summary())
+        .containsEntry("writer-thread", 
"test-extra-commit-message-writer-thread")
+        .containsEntry("extra-key", "someValue")
+        .containsEntry("another-key", "anotherValue");
   }
 
   @Test
@@ -459,8 +469,14 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     Thread writerThread =
         new Thread(
             () -> {
-              Map<String, String> properties = Maps.newHashMap();
-              properties.put("writer-thread", 
String.valueOf(Thread.currentThread().getName()));
+              Map<String, String> properties =
+                  ImmutableMap.of(
+                      "writer-thread",
+                      String.valueOf(Thread.currentThread().getName()),
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+                      "someValue",
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+                      "anotherValue");
               CommitMetadata.withCommitProperties(
                   properties,
                   () -> {
@@ -477,7 +493,9 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
     Assert.assertEquals(2, snapshots.size());
     Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
-    Assert.assertEquals(
-        "test-extra-commit-message-delete-thread", 
snapshots.get(1).summary().get("writer-thread"));
+    Assertions.assertThat(snapshots.get(1).summary())
+        .containsEntry("writer-thread", 
"test-extra-commit-message-delete-thread")
+        .containsEntry("extra-key", "someValue")
+        .containsEntry("another-key", "anotherValue");
   }
 }

Reply via email to