This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a4aef817326 HIVE-27593: Iceberg: Keep Iceberg properties in sync with
HMS (Butao Zhang, reviewed by Denys Kuzmenko)
a4aef817326 is described below
commit a4aef8173269d213eb13666baf1751bf110579c8
Author: Butao Zhang <[email protected]>
AuthorDate: Wed Nov 8 17:15:35 2023 +0800
HIVE-27593: Iceberg: Keep Iceberg properties in sync with HMS (Butao Zhang,
reviewed by Denys Kuzmenko)
Closes #4573
---
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 36 +++++++++++++----
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 47 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 7 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index f174bd7b2fe..540a3b8638c 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -128,6 +128,10 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.iceberg.TableProperties.DELETE_MODE;
+import static org.apache.iceberg.TableProperties.MERGE_MODE;
+import static org.apache.iceberg.TableProperties.UPDATE_MODE;
+
public class HiveIcebergMetaHook implements HiveMetaHook {
private static final Logger LOG =
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
public static final Map<String, String> COMMON_HMS_PROPERTIES =
ImmutableMap.of(
@@ -428,7 +432,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
assertNotMigratedTable(hmsTable.getParameters(), "CHANGE COLUMN");
handleChangeColumn(hmsTable);
} else {
- setDeleteModeOnTableProperties(icebergTable, hmsTable.getParameters());
+ setDeleteModeOnTableProperties(icebergTable, hmsTable.getParameters(),
context);
assertNotCrossTableMetadataLocationChange(hmsTable.getParameters(),
context);
}
@@ -741,7 +745,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
// Remove creation related properties
PARAMETERS_TO_REMOVE.forEach(hmsParams::remove);
- setDeleteModeOnTableProperties(null, hmsParams);
+ setDeleteModeOnTableProperties(null, hmsParams, null);
}
/**
@@ -981,14 +985,32 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
}
// TODO: remove this if copy-on-write mode gets implemented in Hive
- private static void setDeleteModeOnTableProperties(Table icebergTable,
Map<String, String> newProps) {
+ private void setDeleteModeOnTableProperties(Table icebergTbl, Map<String,
String> newProps,
+ EnvironmentContext context) {
// Hive only supports merge-on-read delete mode, it will actually throw an
error if DML operations are attempted on
// tables that don't have this (the default is copy-on-write). We set this
at table creation and v1->v2 conversion.
- if ((icebergTable == null || ((BaseTable)
icebergTable).operations().current().formatVersion() == 1) &&
+ if ((icebergTbl == null || ((BaseTable)
icebergTbl).operations().current().formatVersion() == 1) &&
IcebergTableUtil.isV2Table(newProps)) {
- newProps.put(TableProperties.DELETE_MODE,
HiveIcebergStorageHandler.MERGE_ON_READ);
- newProps.put(TableProperties.UPDATE_MODE,
HiveIcebergStorageHandler.MERGE_ON_READ);
- newProps.put(TableProperties.MERGE_MODE,
HiveIcebergStorageHandler.MERGE_ON_READ);
+ List<String> writeModeList = Arrays.asList(DELETE_MODE, UPDATE_MODE,
MERGE_MODE);
+ writeModeList.stream()
+ .filter(writeMode -> catalogProperties.get(writeMode) == null)
+ .forEach(writeMode -> {
+ catalogProperties.put(writeMode,
HiveIcebergStorageHandler.MERGE_ON_READ);
+ newProps.put(writeMode, HiveIcebergStorageHandler.MERGE_ON_READ);
+ });
+
+ if (context != null) {
+ Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR);
+ Map<String, String> contextProperties = context.getProperties();
+ if (contextProperties.containsKey(SET_PROPERTIES)) {
+ String propValue = context.getProperties().get(SET_PROPERTIES);
+ String writeModeStr = writeModeList.stream().filter(writeMode ->
+
!splitter.splitToList(propValue).contains(writeMode)).collect(Collectors.joining("'"));
+ if (!writeModeStr.isEmpty()) {
+ contextProperties.put(SET_PROPERTIES, propValue + "'" +
writeModeStr);
+ }
+ }
+ }
}
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index dc17e472b9c..0baf210340f 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -1928,6 +1928,53 @@ public class TestHiveIcebergStorageHandlerNoScan {
Assert.assertTrue(hmsTblLocation.getFileSystem(shell.getHiveConf()).exists(hmsTblLocation));
}
+ @Test
+ public void testSnycProperties() throws TException, InterruptedException {
+ Assume.assumeTrue("This test is only for hive catalog", testTableType ==
TestTables.TestTableType.HIVE_CATALOG);
+
+ // Test create v2 iceberg table and check iceberg properties & hms
properties
+ TableIdentifier identifier = TableIdentifier.of("default", "customers_v2");
+ shell.executeStatement("CREATE TABLE customers_v2 (id int, name string)
Stored by Iceberg stored as ORC " +
+ "TBLPROPERTIES
('format-version'='2','write.delete.mode'='copy-on-write')");
+ org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
+ org.apache.hadoop.hive.metastore.api.Table hmsTable =
shell.metastore().getTable("default", "customers_v2");
+ Map<String, String> icePros = icebergTable.properties();
+ Map<String, String> hmsProps = hmsTable.getParameters();
+ Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE),
HiveIcebergStorageHandler.COPY_ON_WRITE);
+ Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE),
HiveIcebergStorageHandler.MERGE_ON_READ);
+ Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE),
HiveIcebergStorageHandler.MERGE_ON_READ);
+ Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE),
hmsProps.get(TableProperties.DELETE_MODE));
+ Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE),
hmsProps.get(TableProperties.UPDATE_MODE));
+ Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE),
hmsProps.get(TableProperties.MERGE_MODE));
+
+ // Test create v1 iceberg table and check its properties before and after
it upgrades to v2
+ identifier = TableIdentifier.of("default", "customers_v1");
+ shell.executeStatement("CREATE TABLE customers_v1 (id int, name string)
Stored by Iceberg stored as ORC");
+ icebergTable = testTables.loadTable(identifier);
+ hmsTable = shell.metastore().getTable("default", "customers_v1");
+ icePros = icebergTable.properties();
+ hmsProps = hmsTable.getParameters();
+ // check v1 iceberg table properties
+ Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE), null);
+ Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE), null);
+ Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), null);
+ Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE),
hmsProps.get(TableProperties.DELETE_MODE));
+ Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE),
hmsProps.get(TableProperties.UPDATE_MODE));
+ Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE),
hmsProps.get(TableProperties.MERGE_MODE));
+ // check table properties after upgrading to v2
+ shell.executeStatement("ALTER TABLE customers_v1 SET TBLPROPERTIES
('format-version'='2')");
+ icebergTable = testTables.loadTable(identifier);
+ hmsTable = shell.metastore().getTable("default", "customers_v1");
+ icePros = icebergTable.properties();
+ hmsProps = hmsTable.getParameters();
+ Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE),
HiveIcebergStorageHandler.MERGE_ON_READ);
+ Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE),
HiveIcebergStorageHandler.MERGE_ON_READ);
+ Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE),
HiveIcebergStorageHandler.MERGE_ON_READ);
+ Assert.assertEquals(icePros.get(TableProperties.DELETE_MODE),
hmsProps.get(TableProperties.DELETE_MODE));
+ Assert.assertEquals(icePros.get(TableProperties.UPDATE_MODE),
hmsProps.get(TableProperties.UPDATE_MODE));
+ Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE),
hmsProps.get(TableProperties.MERGE_MODE));
+ }
+
private String
getCurrentSnapshotForHiveCatalogTable(org.apache.iceberg.Table icebergTable) {
return ((BaseMetastoreTableOperations) ((BaseTable)
icebergTable).operations()).currentMetadataLocation();
}