This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fa6c51094 [GOBBLIN-2157] Copy table properties in iceberg distcp 
(#4056)
4fa6c51094 is described below

commit 4fa6c51094ea2b70ee8175d7cb00e076bbef3d0c
Author: William Lo <[email protected]>
AuthorDate: Thu Sep 19 17:34:31 2024 -0400

    [GOBBLIN-2157] Copy table properties in iceberg distcp (#4056)
    
    * Replicate table properties when registering an iceberg table with src 
table metadata
---
 .../data/management/copy/iceberg/IcebergTable.java |  5 ++--
 .../management/copy/iceberg/IcebergTableTest.java  | 30 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 1a56a8d93c..e802e10297 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -212,8 +212,9 @@ public class IcebergTable {
    * @param dstMetadata is null if destination {@link IcebergTable} is absent, 
in which case registration is skipped */
   protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata 
dstMetadata) {
     if (dstMetadata != null) {
-      // use current destination metadata as 'base metadata' and source as 
'updated metadata' while committing
-      this.tableOps.commit(dstMetadata, 
srcMetadata.replaceProperties(dstMetadata.properties()));
+      // Use current destination metadata as 'base metadata', but commit the 
source-side metadata
+      // to synchronize source-side property deletion over to the destination
+      this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 096962320a..a1a29444ed 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -35,6 +36,7 @@ import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,6 +49,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 
@@ -181,6 +184,33 @@ public class IcebergTableTest extends HiveMetastoreTest {
     }
   }
 
+  /** Verify that registerIcebergTable will update existing table properties */
+  @Test
+  public void testNewTablePropertiesAreRegistered() throws Exception {
+    Map<String, String> srcTableProperties = Maps.newHashMap();
+    Map<String, String> destTableProperties = Maps.newHashMap();
+
+    srcTableProperties.put("newKey", "newValue");
+    // Expect the old value to be overwritten by the new value
+    srcTableProperties.put("testKey", "testValueNew");
+    destTableProperties.put("testKey", "testValueOld");
+    // Expect existing property values to be deleted if it does not exist on 
the source
+    destTableProperties.put("deletedTableProperty", 
"deletedTablePropertyValue");
+
+    TableIdentifier destTableId = TableIdentifier.of(dbName, "destTable");
+    catalog.createTable(destTableId, icebergSchema, null, destTableProperties);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, 
catalog.newTableOps(destTableId), catalogUri);
+    // Mock a source table with the same table UUID copying new properties
+    TableMetadata newSourceTableProperties = 
destIcebergTable.accessTableMetadata().replaceProperties(srcTableProperties);
+
+    destIcebergTable.registerIcebergTable(newSourceTableProperties, 
destIcebergTable.accessTableMetadata());
+    
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().size(), 
2);
+    
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("newKey"),
 "newValue");
+    
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("testKey"),
 "testValueNew");
+    
Assert.assertNull(destIcebergTable.accessTableMetadata().properties().get("deletedTableProperty"));
+  }
+
   /** full validation for a particular {@link IcebergSnapshotInfo} */
   protected void verifySnapshotInfo(IcebergSnapshotInfo snapshotInfo, 
List<List<String>> perSnapshotFilesets, int overallNumSnapshots) {
     // verify metadata file

Reply via email to