This is an automated email from the ASF dual-hosted git repository.
zhangbutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new eb2cac384da HIVE-28015: Iceberg: Add identifier-field-ids support in
Hive (#5047)(Butao Zhang, reviewed by Denys Kuzmenko)
eb2cac384da is described below
commit eb2cac384da8e71a049ff44d883ca363938c6a69
Author: Butao Zhang <[email protected]>
AuthorDate: Wed Feb 21 20:51:50 2024 +0800
HIVE-28015: Iceberg: Add identifier-field-ids support in Hive (#5047)(Butao
Zhang, reviewed by Denys Kuzmenko)
---
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 55 +++++++++++++++++----
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 56 ++++++++++++++++++++++
.../apache/hadoop/hive/metastore/HiveMetaHook.java | 12 +++++
.../hadoop/hive/metastore/HiveMetaStoreClient.java | 2 +-
4 files changed, 115 insertions(+), 10 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 9a108e51972..94aabe65d43 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -43,9 +43,11 @@ import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.PartitionDropOptions;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -122,6 +124,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructProjection;
import org.apache.thrift.TException;
@@ -194,6 +197,12 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
@Override
public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table
hmsTable) {
+ CreateTableRequest request = new CreateTableRequest(hmsTable);
+ preCreateTable(request);
+ }
+ @Override
+ public void preCreateTable(CreateTableRequest request) {
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
if (hmsTable.isTemporary()) {
throw new UnsupportedOperationException("Creation of temporary iceberg
tables is not supported.");
}
@@ -234,7 +243,12 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
// - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC
takes precedence so the user can override the
// Iceberg schema and specification generated by the code
- Schema schema = schema(catalogProperties, hmsTable);
+ Set<String> identifierFields =
Optional.ofNullable(request.getPrimaryKeys())
+ .map(primaryKeys -> primaryKeys.stream()
+ .map(SQLPrimaryKey::getColumn_name)
+ .collect(Collectors.toSet()))
+ .orElse(Collections.emptySet());
+ Schema schema = schema(catalogProperties, hmsTable, identifierFields);
PartitionSpec spec = spec(conf, schema, hmsTable);
// If there are partition keys specified remove them from the HMS table
and add them to the column list
@@ -255,6 +269,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
// Set whether the format is ORC, to be used during vectorization.
setOrcOnlyFilesParam(hmsTable);
+ // Remove hive primary key columns from table request, as iceberg doesn't
support hive primary key.
+ request.setPrimaryKeys(null);
}
@Override
@@ -384,7 +400,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
preAlterTableProperties = new PreAlterTableProperties();
preAlterTableProperties.tableLocation = sd.getLocation();
preAlterTableProperties.format = sd.getInputFormat();
- preAlterTableProperties.schema = schema(catalogProperties, hmsTable);
+ preAlterTableProperties.schema = schema(catalogProperties, hmsTable,
Collections.emptySet());
preAlterTableProperties.partitionKeys = hmsTable.getPartitionKeys();
context.getProperties().put(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE,
"true");
@@ -794,19 +810,40 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
return properties;
}
- private Schema schema(Properties properties,
org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ private Schema schema(Properties properties,
org.apache.hadoop.hive.metastore.api.Table hmsTable,
+ Set<String> identifierFields) {
boolean autoConversion =
conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) {
return
SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA));
- } else if (hmsTable.isSetPartitionKeys() &&
!hmsTable.getPartitionKeys().isEmpty()) {
- // Add partitioning columns to the original column list before creating
the Iceberg Schema
- List<FieldSchema> cols = Lists.newArrayList(hmsTable.getSd().getCols());
+ }
+ List<FieldSchema> cols = Lists.newArrayList(hmsTable.getSd().getCols());
+ if (hmsTable.isSetPartitionKeys() &&
!hmsTable.getPartitionKeys().isEmpty()) {
cols.addAll(hmsTable.getPartitionKeys());
- return HiveSchemaUtil.convert(cols, autoConversion);
- } else {
- return HiveSchemaUtil.convert(hmsTable.getSd().getCols(),
autoConversion);
}
+ Schema schema = HiveSchemaUtil.convert(cols, autoConversion);
+
+ return getSchemaWithIdentifierFields(schema, identifierFields);
+ }
+
+ private Schema getSchemaWithIdentifierFields(Schema schema, Set<String>
identifierFields) {
+ if (identifierFields == null || identifierFields.isEmpty()) {
+ return schema;
+ }
+ Set<Integer> identifierFieldIds = identifierFields.stream()
+ .map(column -> {
+ Types.NestedField field = schema.findField(column);
+ Preconditions.checkNotNull(field,
+ "Cannot find identifier field ID for the column %s in
schema %s", column, schema);
+ return field.fieldId();
+ })
+ .collect(Collectors.toSet());
+
+ List<Types.NestedField> cols = schema.columns().stream()
+ .map(column -> identifierFieldIds.contains(column.fieldId()) ?
column.asRequired() : column)
+ .collect(Collectors.toList());
+
+ return new Schema(cols, identifierFieldIds);
}
private static PartitionSpec spec(Configuration configuration, Schema schema,
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index c1bbeb03989..5a63733bd1e 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -75,6 +75,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -2054,6 +2055,61 @@ public class TestHiveIcebergStorageHandlerNoScan {
Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE),
hmsProps.get(TableProperties.MERGE_MODE));
}
+ @Test
+ public void testCreateTableWithIdentifierField() {
+ TableIdentifier identifier = TableIdentifier.of("default", "customers");
+ String query = String.format("CREATE EXTERNAL TABLE customers (" +
+ "customer_id BIGINT primary key disable novalidate, " +
+ "first_name STRING, " +
+ "last_name STRING) " +
+ "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
+ testTables.locationForCreateTableSQL(identifier),
+ InputFormatConfig.CATALOG_NAME,
+ testTables.catalogName());
+ shell.executeStatement(query);
+ org.apache.iceberg.Table table = testTables.loadTable(identifier);
+ Assert.assertEquals("Should have new identifier field",
+
Sets.newHashSet(table.schema().findField("customer_id").fieldId()),
table.schema().identifierFieldIds());
+ }
+
+ @Test
+ public void testCreateTableWithMultiIdentifierFields() {
+ TableIdentifier identifier = TableIdentifier.of("default", "customers");
+ String query = String.format("CREATE EXTERNAL TABLE customers (" +
+ "customer_id BIGINT," +
+ "first_name STRING, " +
+ "last_name STRING," +
+ "primary key (customer_id, first_name) disable novalidate)
" +
+ "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
+ testTables.locationForCreateTableSQL(identifier),
+ InputFormatConfig.CATALOG_NAME,
+ testTables.catalogName());
+ shell.executeStatement(query);
+ org.apache.iceberg.Table table = testTables.loadTable(identifier);
+ Assert.assertEquals("Should have new two identifier fields",
+ Sets.newHashSet(table.schema().findField("customer_id").fieldId(),
+ table.schema().findField("first_name").fieldId()),
table.schema().identifierFieldIds());
+ }
+
+ @Test
+ public void testCreateTableFailedWithNestedIdentifierField() {
+ TableIdentifier identifier = TableIdentifier.of("default", "customers");
+ String query = String.format("CREATE EXTERNAL TABLE
customers_with_nested_column (" +
+ "customer_id BIGINT," +
+ "first_name STRING, " +
+ "last_name STRING, " +
+ "user_info STRUCT<address: STRING, phone: STRING> primary
key disable novalidate) " +
+ "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
+ testTables.locationForCreateTableSQL(identifier),
+ InputFormatConfig.CATALOG_NAME,
+ testTables.catalogName());
+
+ // Iceberg table doesn't support nested column as identifier field.
+ Assert.assertThrows(
+ "Cannot add field user_info as an identifier field: not a
primitive type field",
+ IllegalArgumentException.class, () ->
shell.executeStatement(query));
+ }
+
private String
getCurrentSnapshotForHiveCatalogTable(org.apache.iceberg.Table icebergTable) {
return ((BaseMetastoreTableOperations) ((BaseTable)
icebergTable).operations()).currentMetadataLocation();
}
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
index 695a3282838..115942b9b8f 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -76,6 +77,17 @@ public interface HiveMetaHook {
void preCreateTable(Table table)
throws MetaException;
+ /**
+ * Called before a new table definition is added to the metastore
+ * during CREATE TABLE.
+ *
+ * @param request the whole request to create a new table
+ */
+ default void preCreateTable(CreateTableRequest request)
+ throws MetaException {
+ preCreateTable(request.getTable());
+ }
+
/**
* Called after failure adding a new table definition to the metastore
* during CREATE TABLE.
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 7d484bf44cd..862096cb4d8 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -1481,7 +1481,7 @@ public class HiveMetaStoreClient implements
IMetaStoreClient, AutoCloseable {
HiveMetaHook hook = getHook(tbl);
if (hook != null) {
- hook.preCreateTable(tbl);
+ hook.preCreateTable(request);
}
boolean success = false;
try {