This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4fa6c51094 [GOBBLIN-2157] Copy table properties in iceberg distcp
(#4056)
4fa6c51094 is described below
commit 4fa6c51094ea2b70ee8175d7cb00e076bbef3d0c
Author: William Lo <[email protected]>
AuthorDate: Thu Sep 19 17:34:31 2024 -0400
[GOBBLIN-2157] Copy table properties in iceberg distcp (#4056)
* Replicate table properties when registering an iceberg table with src
table metadata
---
.../data/management/copy/iceberg/IcebergTable.java | 5 ++--
.../management/copy/iceberg/IcebergTableTest.java | 30 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 1a56a8d93c..e802e10297 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -212,8 +212,9 @@ public class IcebergTable {
* @param dstMetadata is null if destination {@link IcebergTable} is absent,
in which case registration is skipped */
protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata
dstMetadata) {
if (dstMetadata != null) {
- // use current destination metadata as 'base metadata' and source as
'updated metadata' while committing
- this.tableOps.commit(dstMetadata,
srcMetadata.replaceProperties(dstMetadata.properties()));
+ // Use current destination metadata as 'base metadata', but commit the
source-side metadata
+ // to synchronize source-side property deletion over to the destination
+ this.tableOps.commit(dstMetadata, srcMetadata);
}
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 096962320a..a1a29444ed 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -35,6 +36,7 @@ import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,6 +49,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -181,6 +184,33 @@ public class IcebergTableTest extends HiveMetastoreTest {
}
}
+ /** Verify that registerIcebergTable will update existing table properties */
+ @Test
+ public void testNewTablePropertiesAreRegistered() throws Exception {
+ Map<String, String> srcTableProperties = Maps.newHashMap();
+ Map<String, String> destTableProperties = Maps.newHashMap();
+
+ srcTableProperties.put("newKey", "newValue");
+ // Expect the old value to be overwritten by the new value
+ srcTableProperties.put("testKey", "testValueNew");
+ destTableProperties.put("testKey", "testValueOld");
+ // Expect existing property values to be deleted if it does not exist on
the source
+ destTableProperties.put("deletedTableProperty",
"deletedTablePropertyValue");
+
+ TableIdentifier destTableId = TableIdentifier.of(dbName, "destTable");
+ catalog.createTable(destTableId, icebergSchema, null, destTableProperties);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId,
catalog.newTableOps(destTableId), catalogUri);
+ // Mock a source table with the same table UUID copying new properties
+ TableMetadata newSourceTableProperties =
destIcebergTable.accessTableMetadata().replaceProperties(srcTableProperties);
+
+ destIcebergTable.registerIcebergTable(newSourceTableProperties,
destIcebergTable.accessTableMetadata());
+
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().size(),
2);
+
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("newKey"),
"newValue");
+
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("testKey"),
"testValueNew");
+
Assert.assertNull(destIcebergTable.accessTableMetadata().properties().get("deletedTableProperty"));
+ }
+
/** full validation for a particular {@link IcebergSnapshotInfo} */
protected void verifySnapshotInfo(IcebergSnapshotInfo snapshotInfo,
List<List<String>> perSnapshotFilesets, int overallNumSnapshots) {
// verify metadata file