This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a4aef817326 HIVE-27593: Iceberg: Keep Iceberg properties in sync with 
HMS (Butao Zhang, reviewed by Denys Kuzmenko)
a4aef817326 is described below

commit a4aef8173269d213eb13666baf1751bf110579c8
Author: Butao Zhang <[email protected]>
AuthorDate: Wed Nov 8 17:15:35 2023 +0800

    HIVE-27593: Iceberg: Keep Iceberg properties in sync with HMS (Butao Zhang, 
reviewed by Denys Kuzmenko)
    
    Closes #4573
---
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       | 36 +++++++++++++----
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  | 47 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 7 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index f174bd7b2fe..540a3b8638c 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -128,6 +128,10 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.iceberg.TableProperties.DELETE_MODE;
+import static org.apache.iceberg.TableProperties.MERGE_MODE;
+import static org.apache.iceberg.TableProperties.UPDATE_MODE;
+
 public class HiveIcebergMetaHook implements HiveMetaHook {
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
   public static final Map<String, String> COMMON_HMS_PROPERTIES = 
ImmutableMap.of(
@@ -428,7 +432,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       assertNotMigratedTable(hmsTable.getParameters(), "CHANGE COLUMN");
       handleChangeColumn(hmsTable);
     } else {
-      setDeleteModeOnTableProperties(icebergTable, hmsTable.getParameters());
+      setDeleteModeOnTableProperties(icebergTable, hmsTable.getParameters(), 
context);
       assertNotCrossTableMetadataLocationChange(hmsTable.getParameters(), 
context);
     }
 
@@ -741,7 +745,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     // Remove creation related properties
     PARAMETERS_TO_REMOVE.forEach(hmsParams::remove);
 
-    setDeleteModeOnTableProperties(null, hmsParams);
+    setDeleteModeOnTableProperties(null, hmsParams, null);
   }
 
   /**
@@ -981,14 +985,32 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   }
 
   // TODO: remove this if copy-on-write mode gets implemented in Hive
-  private static void setDeleteModeOnTableProperties(Table icebergTable, 
Map<String, String> newProps) {
+  private void setDeleteModeOnTableProperties(Table icebergTbl, Map<String, 
String> newProps,
+      EnvironmentContext context) {
     // Hive only supports merge-on-read delete mode, it will actually throw an 
error if DML operations are attempted on
     // tables that don't have this (the default is copy-on-write). We set this 
at table creation and v1->v2 conversion.
-    if ((icebergTable == null || ((BaseTable) 
icebergTable).operations().current().formatVersion() == 1) &&
+    if ((icebergTbl == null || ((BaseTable) 
icebergTbl).operations().current().formatVersion() == 1) &&
         IcebergTableUtil.isV2Table(newProps)) {
-      newProps.put(TableProperties.DELETE_MODE, 
HiveIcebergStorageHandler.MERGE_ON_READ);
-      newProps.put(TableProperties.UPDATE_MODE, 
HiveIcebergStorageHandler.MERGE_ON_READ);
-      newProps.put(TableProperties.MERGE_MODE, 
HiveIcebergStorageHandler.MERGE_ON_READ);
+      List<String> writeModeList = Arrays.asList(DELETE_MODE, UPDATE_MODE, 
MERGE_MODE);
+      writeModeList.stream()
+          .filter(writeMode -> catalogProperties.get(writeMode) == null)
+          .forEach(writeMode -> {
+            catalogProperties.put(writeMode, 
HiveIcebergStorageHandler.MERGE_ON_READ);
+            newProps.put(writeMode, HiveIcebergStorageHandler.MERGE_ON_READ);
+          });
+
+      if (context != null) {
+        Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR);
+        Map<String, String> contextProperties = context.getProperties();
+        if (contextProperties.containsKey(SET_PROPERTIES)) {
+          String propValue = context.getProperties().get(SET_PROPERTIES);
+          String writeModeStr = writeModeList.stream().filter(writeMode ->
+              
!splitter.splitToList(propValue).contains(writeMode)).collect(Collectors.joining("'"));
+          if (!writeModeStr.isEmpty()) {
+            contextProperties.put(SET_PROPERTIES, propValue + "'" + 
writeModeStr);
+          }
+        }
+      }
     }
   }
 
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index dc17e472b9c..0baf210340f 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -1928,6 +1928,53 @@ public class TestHiveIcebergStorageHandlerNoScan {
     
Assert.assertTrue(hmsTblLocation.getFileSystem(shell.getHiveConf()).exists(hmsTblLocation));
   }
 
+  @Test
+  public void testSnycProperties() throws TException, InterruptedException {
+    Assume.assumeTrue("This test is only for hive catalog", testTableType == 
TestTables.TestTableType.HIVE_CATALOG);
+
+    // Test create v2 iceberg table and check iceberg properties & hms 
properties
+    TableIdentifier identifier = TableIdentifier.of("default", "customers_v2");
+    shell.executeStatement("CREATE TABLE customers_v2 (id int, name string) 
Stored by Iceberg stored as ORC " +
+        "TBLPROPERTIES 
('format-version'='2','write.delete.mode'='copy-on-write')");
+    org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = 
shell.metastore().getTable("default", "customers_v2");
+    Map<String, String> icePros = icebergTable.properties();
+    Map<String, String> hmsProps = hmsTable.getParameters();
+    Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE), 
HiveIcebergStorageHandler.COPY_ON_WRITE);
+    Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE), 
HiveIcebergStorageHandler.MERGE_ON_READ);
+    Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), 
HiveIcebergStorageHandler.MERGE_ON_READ);
+    Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE), 
hmsProps.get(TableProperties.DELETE_MODE));
+    Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE), 
hmsProps.get(TableProperties.UPDATE_MODE));
+    Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), 
hmsProps.get(TableProperties.MERGE_MODE));
+
+    // Test create v1 iceberg table and check its properties before and after 
it upgrades to v2
+    identifier = TableIdentifier.of("default", "customers_v1");
+    shell.executeStatement("CREATE TABLE customers_v1 (id int, name string) 
Stored by Iceberg stored as ORC");
+    icebergTable = testTables.loadTable(identifier);
+    hmsTable = shell.metastore().getTable("default", "customers_v1");
+    icePros = icebergTable.properties();
+    hmsProps = hmsTable.getParameters();
+    // check v1 iceberg table properties
+    Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE), null);
+    Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE), null);
+    Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), null);
+    Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE), 
hmsProps.get(TableProperties.DELETE_MODE));
+    Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE), 
hmsProps.get(TableProperties.UPDATE_MODE));
+    Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), 
hmsProps.get(TableProperties.MERGE_MODE));
+    // check table properties after upgrading to v2
+    shell.executeStatement("ALTER TABLE customers_v1 SET TBLPROPERTIES 
('format-version'='2')");
+    icebergTable = testTables.loadTable(identifier);
+    hmsTable = shell.metastore().getTable("default", "customers_v1");
+    icePros = icebergTable.properties();
+    hmsProps = hmsTable.getParameters();
+    Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE), 
HiveIcebergStorageHandler.MERGE_ON_READ);
+    Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE), 
HiveIcebergStorageHandler.MERGE_ON_READ);
+    Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), 
HiveIcebergStorageHandler.MERGE_ON_READ);
+    Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE), 
hmsProps.get(TableProperties.DELETE_MODE));
+    Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE), 
hmsProps.get(TableProperties.UPDATE_MODE));
+    Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), 
hmsProps.get(TableProperties.MERGE_MODE));
+  }
+
   private String 
getCurrentSnapshotForHiveCatalogTable(org.apache.iceberg.Table icebergTable) {
     return ((BaseMetastoreTableOperations) ((BaseTable) 
icebergTable).operations()).currentMetadataLocation();
   }

Reply via email to