This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c8e140d640 Spark 3.4: Remove 'snapshot-property' prefix in
CommitMetadata properties (#7986)
c8e140d640 is described below
commit c8e140d640564d1b02525c53be23c3628c2a5249
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Jul 5 14:27:09 2023 +0200
Spark 3.4: Remove 'snapshot-property' prefix in CommitMetadata properties
(#7986)
---
docs/spark-configuration.md | 4 +--
.../org/apache/iceberg/spark/CommitMetadata.java | 12 ++++++--
.../spark/source/TestDataSourceOptions.java | 34 +++++++++++++++++-----
3 files changed, 38 insertions(+), 12 deletions(-)
diff --git a/docs/spark-configuration.md b/docs/spark-configuration.md
index 8d02246904..3a91f21176 100644
--- a/docs/spark-configuration.md
+++ b/docs/spark-configuration.md
@@ -189,13 +189,13 @@ df.write
| write-format | Table write.format.default | File format to use for
this write operation; parquet, avro, or orc |
| target-file-size-bytes | As per table property | Overrides this table's
write.target-file-size-bytes |
| check-nullability | true | Sets the nullable
check on fields |
-| snapshot-property._custom-key_ | null | Adds an entry with
custom-key and corresponding value in the snapshot summary |
+| snapshot-property._custom-key_ | null | Adds an entry with
custom-key and corresponding value in the snapshot summary (the
`snapshot-property.` prefix is only required for DSv2) |
| fanout-enabled | false | Overrides this table's
write.spark.fanout.enabled |
| check-ordering | true | Checks if input schema and table schema
are same |
| isolation-level | null | Desired isolation level for Dataframe overwrite
operations. `null` => no checks (for idempotent writes), `serializable` =>
check for concurrent inserts or deletes in destination partitions, `snapshot`
=> checks for concurrent deletes in destination partitions. |
| validate-from-snapshot-id | null | If isolation level is set, id of base
snapshot from which to check concurrent write conflicts into a table. Should be
the snapshot before any reads from the table. Can be obtained via [Table
API](../../api#table-metadata) or [Snapshots
table](../spark-queries#snapshots). If null, the table's oldest known snapshot
is used. |
-CommitMetadata provides an interface to add custom metadata to a snapshot
summary during a SQL execution, which can be beneficial for purposes such as
auditing or change tracking. Here is an example:
+CommitMetadata provides an interface to add custom metadata to a snapshot
summary during a SQL execution, which can be beneficial for purposes such as
auditing or change tracking. If properties start with `snapshot-property.`,
then that prefix will be removed from each property. Here is an example:
```java
import org.apache.iceberg.spark.CommitMetadata;
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
index 641b957d11..ea400a7792 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ExceptionUtil;
/** utility class to accept thread local commit properties */
@@ -35,13 +37,19 @@ public class CommitMetadata {
* running the code wrapped as a caller, and any snapshot committed within
the callable object
* will be attached with the metadata defined in properties
*
- * @param properties extra commit metadata to attach to the snapshot
committed within callable
+ * @param properties extra commit metadata to attach to the snapshot
committed within callable.
+ * The prefix will be removed for properties starting with {@link
+ * SnapshotSummary#EXTRA_METADATA_PREFIX}
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from
callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass)
throws E {
- COMMIT_PROPERTIES.set(properties);
+ Map<String, String> props = Maps.newHashMap();
+ properties.forEach(
+ (k, v) -> props.put(k.replace(SnapshotSummary.EXTRA_METADATA_PREFIX,
""), v));
+
+ COMMIT_PROPERTIES.set(props);
try {
return callable.call();
} catch (Throwable e) {
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 5e819200f5..83d8953735 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -32,10 +32,12 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
@@ -425,8 +427,14 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
Thread writerThread =
new Thread(
() -> {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("writer-thread",
String.valueOf(Thread.currentThread().getName()));
+ Map<String, String> properties =
+ ImmutableMap.of(
+ "writer-thread",
+ String.valueOf(Thread.currentThread().getName()),
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+ "someValue",
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+ "anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
@@ -442,8 +450,10 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
- Assert.assertEquals(
- "test-extra-commit-message-writer-thread",
snapshots.get(1).summary().get("writer-thread"));
+ Assertions.assertThat(snapshots.get(1).summary())
+ .containsEntry("writer-thread",
"test-extra-commit-message-writer-thread")
+ .containsEntry("extra-key", "someValue")
+ .containsEntry("another-key", "anotherValue");
}
@Test
@@ -459,8 +469,14 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
Thread writerThread =
new Thread(
() -> {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("writer-thread",
String.valueOf(Thread.currentThread().getName()));
+ Map<String, String> properties =
+ ImmutableMap.of(
+ "writer-thread",
+ String.valueOf(Thread.currentThread().getName()),
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key",
+ "someValue",
+ SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key",
+ "anotherValue");
CommitMetadata.withCommitProperties(
properties,
() -> {
@@ -477,7 +493,9 @@ public class TestDataSourceOptions extends
SparkTestBaseWithCatalog {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Assert.assertEquals(2, snapshots.size());
Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
- Assert.assertEquals(
- "test-extra-commit-message-delete-thread",
snapshots.get(1).summary().get("writer-thread"));
+ Assertions.assertThat(snapshots.get(1).summary())
+ .containsEntry("writer-thread",
"test-extra-commit-message-delete-thread")
+ .containsEntry("extra-key", "someValue")
+ .containsEntry("another-key", "anotherValue");
}
}