This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9607a52348 Spark 3.1, 3.2, 3.3: Backport removal of snapshot-property
in CommitMetadata properties (#7991)
9607a52348 is described below
commit 9607a52348a28048603f465f4a62568db4db453b
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jul 11 15:53:36 2023 +0200
Spark 3.1, 3.2, 3.3: Backport removal of snapshot-property in
CommitMetadata properties (#7991)
---
.../org/apache/iceberg/spark/CommitMetadata.java | 12 ++++++--
.../spark/source/TestDataSourceOptions.java | 30 ++++++++++++-------
.../org/apache/iceberg/spark/CommitMetadata.java | 12 ++++++--
.../spark/source/TestDataSourceOptions.java | 35 +++++++++++++++++-----
.../org/apache/iceberg/spark/CommitMetadata.java | 12 ++++++--
.../spark/source/TestDataSourceOptions.java | 35 +++++++++++++++++-----
6 files changed, 103 insertions(+), 33 deletions(-)
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
index 641b957d11..ea400a7792 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ExceptionUtil;
/** utility class to accept thread local commit properties */
@@ -35,13 +37,19 @@ public class CommitMetadata {
* running the code wrapped as a caller, and any snapshot committed within
the callable object
* will be attached with the metadata defined in properties
*
- * @param properties extra commit metadata to attach to the snapshot
committed within callable
+ * @param properties extra commit metadata to attach to the snapshot
committed within callable.
+ * The prefix will be removed for properties starting with {@link
+ * SnapshotSummary#EXTRA_METADATA_PREFIX}
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from
callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass)
throws E {
- COMMIT_PROPERTIES.set(properties);
+ Map<String, String> props = Maps.newHashMap();
+ properties.forEach(
+ (k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX,
""), v));
+
+ COMMIT_PROPERTIES.set(props);
try {
return callable.call();
} catch (Throwable e) {
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 7504745640..5eeb87d747 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
@@ -34,13 +33,14 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
@@ -52,6 +52,7 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -427,8 +428,14 @@ public class TestDataSourceOptions {
Thread writerThread =
new Thread(
() -> {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("writer-thread",
String.valueOf(Thread.currentThread().getName()));
+ Map<String, String> properties =
+ ImmutableMap.of(
+ "writer-thread",
+ String.valueOf(Thread.currentThread().getName()),
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+ "someValue",
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+ "anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
@@ -440,12 +447,13 @@ public class TestDataSourceOptions {
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
- Set<String> threadNames = Sets.newHashSet();
- for (Snapshot snapshot : table.snapshots()) {
- threadNames.add(snapshot.summary().get("writer-thread"));
- }
- Assert.assertEquals(2, threadNames.size());
- Assert.assertTrue(threadNames.contains(null));
-
Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ Assert.assertEquals(2, snapshots.size());
+ Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+ Assertions.assertThat(snapshots.get(1).summary())
+ .containsEntry("writer-thread",
"test-extra-commit-message-writer-thread")
+ .containsEntry("extra-key", "someValue")
+ .containsEntry("another-key", "anotherValue");
}
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
index 641b957d11..ea400a7792 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ExceptionUtil;
/** utility class to accept thread local commit properties */
@@ -35,13 +37,19 @@ public class CommitMetadata {
* running the code wrapped as a caller, and any snapshot committed within
the callable object
* will be attached with the metadata defined in properties
*
- * @param properties extra commit metadata to attach to the snapshot
committed within callable
+ * @param properties extra commit metadata to attach to the snapshot
committed within callable.
+ * The prefix will be removed for properties starting with {@link
+ * SnapshotSummary#EXTRA_METADATA_PREFIX}
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from
callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass)
throws E {
- COMMIT_PROPERTIES.set(properties);
+ Map<String, String> props = Maps.newHashMap();
+ properties.forEach(
+ (k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX,
""), v));
+
+ COMMIT_PROPERTIES.set(props);
try {
return callable.call();
} catch (Throwable e) {
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 06e4965a06..dac560eb1a 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -33,10 +33,12 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
@@ -53,6 +55,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -428,8 +431,14 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
Thread writerThread =
new Thread(
() -> {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("writer-thread",
String.valueOf(Thread.currentThread().getName()));
+ Map<String, String> properties =
+ ImmutableMap.of(
+ "writer-thread",
+ String.valueOf(Thread.currentThread().getName()),
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+ "someValue",
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+ "anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
@@ -445,8 +454,10 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
- Assert.assertEquals(
- "test-extra-commit-message-writer-thread",
snapshots.get(1).summary().get("writer-thread"));
+ Assertions.assertThat(snapshots.get(1).summary())
+ .containsEntry("writer-thread",
"test-extra-commit-message-writer-thread")
+ .containsEntry("extra-key", "someValue")
+ .containsEntry("another-key", "anotherValue");
}
@Test
@@ -462,8 +473,14 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
Thread writerThread =
new Thread(
() -> {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("writer-thread",
String.valueOf(Thread.currentThread().getName()));
+ Map<String, String> properties =
+ ImmutableMap.of(
+ "writer-thread",
+ String.valueOf(Thread.currentThread().getName()),
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+ "someValue",
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+ "anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
@@ -480,7 +497,9 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
- Assert.assertEquals(
- "test-extra-commit-message-delete-thread",
snapshots.get(1).summary().get("writer-thread"));
+ Assertions.assertThat(snapshots.get(1).summary())
+ .containsEntry("writer-thread",
"test-extra-commit-message-delete-thread")
+ .containsEntry("extra-key", "someValue")
+ .containsEntry("another-key", "anotherValue");
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
index 641b957d11..ea400a7792 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ExceptionUtil;
/** utility class to accept thread local commit properties */
@@ -35,13 +37,19 @@ public class CommitMetadata {
* running the code wrapped as a caller, and any snapshot committed within
the callable object
* will be attached with the metadata defined in properties
*
- * @param properties extra commit metadata to attach to the snapshot
committed within callable
+ * @param properties extra commit metadata to attach to the snapshot
committed within callable.
+ * The prefix will be removed for properties starting with {@link
+ * SnapshotSummary#EXTRA_METADATA_PREFIX}
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from
callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass)
throws E {
- COMMIT_PROPERTIES.set(properties);
+ Map<String, String> props = Maps.newHashMap();
+ properties.forEach(
+ (k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX,
""), v));
+
+ COMMIT_PROPERTIES.set(props);
try {
return callable.call();
} catch (Throwable e) {
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 342d8085b1..7e254960e7 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -33,10 +33,12 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
@@ -53,6 +55,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -428,8 +431,14 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
Thread writerThread =
new Thread(
() -> {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("writer-thread",
String.valueOf(Thread.currentThread().getName()));
+ Map<String, String> properties =
+ ImmutableMap.of(
+ "writer-thread",
+ String.valueOf(Thread.currentThread().getName()),
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+ "someValue",
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+ "anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
@@ -445,8 +454,10 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
- Assert.assertEquals(
- "test-extra-commit-message-writer-thread",
snapshots.get(1).summary().get("writer-thread"));
+ Assertions.assertThat(snapshots.get(1).summary())
+ .containsEntry("writer-thread",
"test-extra-commit-message-writer-thread")
+ .containsEntry("extra-key", "someValue")
+ .containsEntry("another-key", "anotherValue");
}
@Test
@@ -462,8 +473,14 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
Thread writerThread =
new Thread(
() -> {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("writer-thread",
String.valueOf(Thread.currentThread().getName()));
+ Map<String, String> properties =
+ ImmutableMap.of(
+ "writer-thread",
+ String.valueOf(Thread.currentThread().getName()),
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+ "someValue",
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+ "anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
@@ -480,7 +497,9 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
- Assert.assertEquals(
- "test-extra-commit-message-delete-thread",
snapshots.get(1).summary().get("writer-thread"));
+ Assertions.assertThat(snapshots.get(1).summary())
+ .containsEntry("writer-thread",
"test-extra-commit-message-delete-thread")
+ .containsEntry("extra-key", "someValue")
+ .containsEntry("another-key", "anotherValue");
}
}