This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9607a52348 Spark 3.1, 3.2, 3.3: Backport removal of snapshot-property 
in CommitMetadata properties (#7991)
9607a52348 is described below

commit 9607a52348a28048603f465f4a62568db4db453b
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jul 11 15:53:36 2023 +0200

    Spark 3.1, 3.2, 3.3: Backport removal of snapshot-property in 
CommitMetadata properties (#7991)
---
 .../org/apache/iceberg/spark/CommitMetadata.java   | 12 ++++++--
 .../spark/source/TestDataSourceOptions.java        | 30 ++++++++++++-------
 .../org/apache/iceberg/spark/CommitMetadata.java   | 12 ++++++--
 .../spark/source/TestDataSourceOptions.java        | 35 +++++++++++++++++-----
 .../org/apache/iceberg/spark/CommitMetadata.java   | 12 ++++++--
 .../spark/source/TestDataSourceOptions.java        | 35 +++++++++++++++++-----
 6 files changed, 103 insertions(+), 33 deletions(-)

diff --git 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
index 641b957d11..ea400a7792 100644
--- 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
+++ 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark;
 
 import java.util.Map;
 import java.util.concurrent.Callable;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ExceptionUtil;
 
 /** utility class to accept thread local commit properties */
@@ -35,13 +37,19 @@ public class CommitMetadata {
    * running the code wrapped as a caller, and any snapshot committed within 
the callable object
    * will be attached with the metadata defined in properties
    *
-   * @param properties extra commit metadata to attach to the snapshot 
committed within callable
+   * @param properties extra commit metadata to attach to the snapshot 
committed within callable.
+   *     The prefix will be removed for properties starting with {@link
+   *     SnapshotSummary#EXTRA_METADATA_PREFIX}
    * @param callable the code to be executed
    * @param exClass the expected type of exception which would be thrown from 
callable
    */
   public static <R, E extends Exception> R withCommitProperties(
       Map<String, String> properties, Callable<R> callable, Class<E> exClass) 
throws E {
-    COMMIT_PROPERTIES.set(properties);
+    Map<String, String> props = Maps.newHashMap();
+    properties.forEach(
+        (k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX, 
""), v));
+
+    COMMIT_PROPERTIES.set(props);
     try {
       return callable.call();
     } catch (Throwable e) {
diff --git 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 7504745640..5eeb87d747 100644
--- 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -34,13 +33,14 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkReadOptions;
@@ -52,6 +52,7 @@ import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -427,8 +428,14 @@ public class TestDataSourceOptions {
     Thread writerThread =
         new Thread(
             () -> {
-              Map<String, String> properties = Maps.newHashMap();
-              properties.put("writer-thread", 
String.valueOf(Thread.currentThread().getName()));
+              Map<String, String> properties =
+                  ImmutableMap.of(
+                      "writer-thread",
+                      String.valueOf(Thread.currentThread().getName()),
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+                      "someValue",
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+                      "anotherValue");
               CommitMetadata.withCommitProperties(
                   properties,
                   () -> {
@@ -440,12 +447,13 @@ public class TestDataSourceOptions {
     writerThread.setName("test-extra-commit-message-writer-thread");
     writerThread.start();
     writerThread.join();
-    Set<String> threadNames = Sets.newHashSet();
-    for (Snapshot snapshot : table.snapshots()) {
-      threadNames.add(snapshot.summary().get("writer-thread"));
-    }
-    Assert.assertEquals(2, threadNames.size());
-    Assert.assertTrue(threadNames.contains(null));
-    
Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    Assert.assertEquals(2, snapshots.size());
+    Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+    Assertions.assertThat(snapshots.get(1).summary())
+        .containsEntry("writer-thread", 
"test-extra-commit-message-writer-thread")
+        .containsEntry("extra-key", "someValue")
+        .containsEntry("another-key", "anotherValue");
   }
 }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
index 641b957d11..ea400a7792 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark;
 
 import java.util.Map;
 import java.util.concurrent.Callable;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ExceptionUtil;
 
 /** utility class to accept thread local commit properties */
@@ -35,13 +37,19 @@ public class CommitMetadata {
    * running the code wrapped as a caller, and any snapshot committed within 
the callable object
    * will be attached with the metadata defined in properties
    *
-   * @param properties extra commit metadata to attach to the snapshot 
committed within callable
+   * @param properties extra commit metadata to attach to the snapshot 
committed within callable.
+   *     The prefix will be removed for properties starting with {@link
+   *     SnapshotSummary#EXTRA_METADATA_PREFIX}
    * @param callable the code to be executed
    * @param exClass the expected type of exception which would be thrown from 
callable
    */
   public static <R, E extends Exception> R withCommitProperties(
       Map<String, String> properties, Callable<R> callable, Class<E> exClass) 
throws E {
-    COMMIT_PROPERTIES.set(properties);
+    Map<String, String> props = Maps.newHashMap();
+    properties.forEach(
+        (k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX, 
""), v));
+
+    COMMIT_PROPERTIES.set(props);
     try {
       return callable.call();
     } catch (Throwable e) {
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 06e4965a06..dac560eb1a 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -33,10 +33,12 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
@@ -53,6 +55,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -428,8 +431,14 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     Thread writerThread =
         new Thread(
             () -> {
-              Map<String, String> properties = Maps.newHashMap();
-              properties.put("writer-thread", 
String.valueOf(Thread.currentThread().getName()));
+              Map<String, String> properties =
+                  ImmutableMap.of(
+                      "writer-thread",
+                      String.valueOf(Thread.currentThread().getName()),
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+                      "someValue",
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+                      "anotherValue");
               CommitMetadata.withCommitProperties(
                   properties,
                   () -> {
@@ -445,8 +454,10 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
     Assert.assertEquals(2, snapshots.size());
     Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
-    Assert.assertEquals(
-        "test-extra-commit-message-writer-thread", 
snapshots.get(1).summary().get("writer-thread"));
+    Assertions.assertThat(snapshots.get(1).summary())
+        .containsEntry("writer-thread", 
"test-extra-commit-message-writer-thread")
+        .containsEntry("extra-key", "someValue")
+        .containsEntry("another-key", "anotherValue");
   }
 
   @Test
@@ -462,8 +473,14 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     Thread writerThread =
         new Thread(
             () -> {
-              Map<String, String> properties = Maps.newHashMap();
-              properties.put("writer-thread", 
String.valueOf(Thread.currentThread().getName()));
+              Map<String, String> properties =
+                  ImmutableMap.of(
+                      "writer-thread",
+                      String.valueOf(Thread.currentThread().getName()),
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+                      "someValue",
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+                      "anotherValue");
               CommitMetadata.withCommitProperties(
                   properties,
                   () -> {
@@ -480,7 +497,9 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
     Assert.assertEquals(2, snapshots.size());
     Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
-    Assert.assertEquals(
-        "test-extra-commit-message-delete-thread", 
snapshots.get(1).summary().get("writer-thread"));
+    Assertions.assertThat(snapshots.get(1).summary())
+        .containsEntry("writer-thread", 
"test-extra-commit-message-delete-thread")
+        .containsEntry("extra-key", "someValue")
+        .containsEntry("another-key", "anotherValue");
   }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
index 641b957d11..ea400a7792 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark;
 
 import java.util.Map;
 import java.util.concurrent.Callable;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ExceptionUtil;
 
 /** utility class to accept thread local commit properties */
@@ -35,13 +37,19 @@ public class CommitMetadata {
    * running the code wrapped as a caller, and any snapshot committed within 
the callable object
    * will be attached with the metadata defined in properties
    *
-   * @param properties extra commit metadata to attach to the snapshot 
committed within callable
+   * @param properties extra commit metadata to attach to the snapshot 
committed within callable.
+   *     The prefix will be removed for properties starting with {@link
+   *     SnapshotSummary#EXTRA_METADATA_PREFIX}
    * @param callable the code to be executed
    * @param exClass the expected type of exception which would be thrown from 
callable
    */
   public static <R, E extends Exception> R withCommitProperties(
       Map<String, String> properties, Callable<R> callable, Class<E> exClass) 
throws E {
-    COMMIT_PROPERTIES.set(properties);
+    Map<String, String> props = Maps.newHashMap();
+    properties.forEach(
+        (k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX, 
""), v));
+
+    COMMIT_PROPERTIES.set(props);
     try {
       return callable.call();
     } catch (Throwable e) {
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 342d8085b1..7e254960e7 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -33,10 +33,12 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
@@ -53,6 +55,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -428,8 +431,14 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     Thread writerThread =
         new Thread(
             () -> {
-              Map<String, String> properties = Maps.newHashMap();
-              properties.put("writer-thread", 
String.valueOf(Thread.currentThread().getName()));
+              Map<String, String> properties =
+                  ImmutableMap.of(
+                      "writer-thread",
+                      String.valueOf(Thread.currentThread().getName()),
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+                      "someValue",
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+                      "anotherValue");
               CommitMetadata.withCommitProperties(
                   properties,
                   () -> {
@@ -445,8 +454,10 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
     Assert.assertEquals(2, snapshots.size());
     Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
-    Assert.assertEquals(
-        "test-extra-commit-message-writer-thread", 
snapshots.get(1).summary().get("writer-thread"));
+    Assertions.assertThat(snapshots.get(1).summary())
+        .containsEntry("writer-thread", 
"test-extra-commit-message-writer-thread")
+        .containsEntry("extra-key", "someValue")
+        .containsEntry("another-key", "anotherValue");
   }
 
   @Test
@@ -462,8 +473,14 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     Thread writerThread =
         new Thread(
             () -> {
-              Map<String, String> properties = Maps.newHashMap();
-              properties.put("writer-thread", 
String.valueOf(Thread.currentThread().getName()));
+              Map<String, String> properties =
+                  ImmutableMap.of(
+                      "writer-thread",
+                      String.valueOf(Thread.currentThread().getName()),
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+                      "someValue",
+                      SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+                      "anotherValue");
               CommitMetadata.withCommitProperties(
                   properties,
                   () -> {
@@ -480,7 +497,9 @@ public class TestDataSourceOptions extends 
SparkTestBaseWithCatalog {
     List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
     Assert.assertEquals(2, snapshots.size());
     Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
-    Assert.assertEquals(
-        "test-extra-commit-message-delete-thread", 
snapshots.get(1).summary().get("writer-thread"));
+    Assertions.assertThat(snapshots.get(1).summary())
+        .containsEntry("writer-thread", 
"test-extra-commit-message-delete-thread")
+        .containsEntry("extra-key", "someValue")
+        .containsEntry("another-key", "anotherValue");
   }
 }

Reply via email to