This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6c561a71d8 HIVE-26190: Implement create iceberg table with metadata
location. (#3259) (Laszlo Pinter, reviewed by Peter Vary)
6c561a71d8 is described below
commit 6c561a71d84d90086650ddd7f7b532eae534fa0d
Author: László Pintér <[email protected]>
AuthorDate: Fri May 6 15:13:07 2022 +0200
HIVE-26190: Implement create iceberg table with metadata location. (#3259)
(Laszlo Pinter, reviewed by Peter Vary)
---
.../main/java/org/apache/iceberg/mr/Catalogs.java | 86 +++++++++++++++++-----
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 16 +++-
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 34 +++++++++
.../org/apache/iceberg/mr/hive/TestTables.java | 73 ++++++++++++++++--
.../apache/hadoop/hive/metastore/HiveMetaHook.java | 8 ++
.../hadoop/hive/metastore/HiveMetaStoreClient.java | 4 +-
6 files changed, 194 insertions(+), 27 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index ccd3c6e196..d716e2d614 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -137,26 +137,12 @@ public final class Catalogs {
* @return the created Iceberg table
*/
public static Table createTable(Configuration conf, Properties props) {
- String schemaString = props.getProperty(InputFormatConfig.TABLE_SCHEMA);
- Preconditions.checkNotNull(schemaString, "Table schema not set");
- Schema schema =
SchemaParser.fromJson(props.getProperty(InputFormatConfig.TABLE_SCHEMA));
-
- String specString = props.getProperty(InputFormatConfig.PARTITION_SPEC);
- PartitionSpec spec = PartitionSpec.unpartitioned();
- if (specString != null) {
- spec = PartitionSpecParser.fromJson(schema, specString);
- }
-
+ Schema schema = schema(props);
+ PartitionSpec spec = spec(props, schema);
String location = props.getProperty(LOCATION);
String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
- // Create a table property map without the controlling properties
- Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size());
- for (Object key : props.keySet()) {
- if (!PROPERTIES_TO_REMOVE.contains(key)) {
- map.put(key.toString(), props.get(key).toString());
- }
- }
+ Map<String, String> map = filterIcebergTableProperties(props);
Optional<Catalog> catalog = loadCatalog(conf, catalogName);
@@ -214,6 +200,31 @@ public final class Catalogs {
return getCatalogProperties(conf, catalogName,
catalogType).get(CatalogProperties.CATALOG_IMPL) == null;
}
+ /**
+ * Register a table with the configured catalog if it does not exist.
+ * @param conf a Hadoop conf
+ * @param props the controlling properties
+ * @param metadataLocation the location of a metadata file
+ * @return the created Iceberg table
+ */
+ public static Table registerTable(Configuration conf, Properties props,
String metadataLocation) {
+ Schema schema = schema(props);
+ PartitionSpec spec = spec(props, schema);
+ Map<String, String> map = filterIcebergTableProperties(props);
+ String location = props.getProperty(LOCATION);
+ String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
+
+ Optional<Catalog> catalog = loadCatalog(conf, catalogName);
+ if (catalog.isPresent()) {
+ String name = props.getProperty(NAME);
+ Preconditions.checkNotNull(name, "Table identifier not set");
+ return catalog.get().registerTable(TableIdentifier.parse(name),
metadataLocation);
+ }
+
+ Preconditions.checkNotNull(location, "Table location not set");
+ return new HadoopTables(conf).create(schema, spec, map, location);
+ }
+
@VisibleForTesting
static Optional<Catalog> loadCatalog(Configuration conf, String catalogName)
{
String catalogType = getCatalogType(conf, catalogName);
@@ -295,4 +306,45 @@ public final class Catalogs {
}
}
}
+
+ /**
+ * Parse the table schema from the properties
+ * @param props the controlling properties
+ * @return schema instance
+ */
+ private static Schema schema(Properties props) {
+ String schemaString = props.getProperty(InputFormatConfig.TABLE_SCHEMA);
+ Preconditions.checkNotNull(schemaString, "Table schema not set");
+ return
SchemaParser.fromJson(props.getProperty(InputFormatConfig.TABLE_SCHEMA));
+ }
+
+ /**
+ * Get the partition spec from the properties
+ * @param props the controlling properties
+ * @param schema instance of the iceberg schema
+ * @return instance of the partition spec
+ */
+ private static PartitionSpec spec(Properties props, Schema schema) {
+ String specString = props.getProperty(InputFormatConfig.PARTITION_SPEC);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ if (specString != null) {
+ spec = PartitionSpecParser.fromJson(schema, specString);
+ }
+ return spec;
+ }
+
+ /**
+ * Create the iceberg table properties without the {@link
Catalogs#PROPERTIES_TO_REMOVE}
+ * @param props the controlling properties
+ * @return map of iceberg table properties
+ */
+ private static Map<String, String> filterIcebergTableProperties(Properties
props) {
+ Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size());
+ for (Object key : props.keySet()) {
+ if (!PROPERTIES_TO_REMOVE.contains(key)) {
+ map.put(key.toString(), props.get(key).toString());
+ }
+ }
+ return map;
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index cb7248029d..a27078581e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -130,6 +130,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
private UpdatePartitionSpec updatePartitionSpec;
private Transaction transaction;
private AlterTableType currentAlterTableOp;
+ private boolean createHMSTableInHook = false;
public HiveIcebergMetaHook(Configuration conf) {
this.conf = conf;
@@ -183,6 +184,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
catalogProperties.put(InputFormatConfig.TABLE_SCHEMA,
SchemaParser.toJson(schema));
catalogProperties.put(InputFormatConfig.PARTITION_SPEC,
PartitionSpecParser.toJson(spec));
setCommonHmsTablePropertiesForIceberg(hmsTable);
+
+ if
(hmsTable.getParameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP))
{
+ createHMSTableInHook = true;
+ }
}
@Override
@@ -197,7 +202,12 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
catalogProperties.put(TableProperties.ENGINE_HIVE_ENABLED, true);
}
- Catalogs.createTable(conf, catalogProperties);
+ String metadataLocation =
hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+ if (metadataLocation != null) {
+ Catalogs.registerTable(conf, catalogProperties, metadataLocation);
+ } else {
+ Catalogs.createTable(conf, catalogProperties);
+ }
}
}
@@ -432,6 +442,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
context.putToProperties("truncateSkipDataDeletion", "true");
}
+ @Override public boolean createHMSTableInHook() {
+ return createHMSTableInHook;
+ }
+
private void alterTableProperties(org.apache.hadoop.hive.metastore.api.Table
hmsTable,
Map<String, String> contextProperties) {
Map<String, String> hmsTableParameters = hmsTable.getParameters();
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index f020a2131e..31b2800d81 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -1422,6 +1423,39 @@ public class TestHiveIcebergStorageHandlerNoScan {
Assert.assertEquals("iceberg://" + hmsTable.getDbName() + "/" +
hmsTable.getTableName(), uriForAuth.toString());
}
+ @Test
+ public void testCreateTableWithMetadataLocation() throws IOException {
+ Assume.assumeTrue("Create with metadata location is only supported for
Hive Catalog tables",
+ testTableType.equals(TestTables.TestTableType.HIVE_CATALOG));
+ TableIdentifier sourceIdentifier = TableIdentifier.of("default", "source");
+ Table sourceTable =
+ testTables.createTable(shell, sourceIdentifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ PartitionSpec.unpartitioned(), FileFormat.PARQUET,
Collections.emptyList(), 1,
+ ImmutableMap.<String,
String>builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build());
+ testTables.appendIcebergTable(shell.getHiveConf(), sourceTable,
FileFormat.PARQUET, null,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ String metadataLocation = ((BaseTable)
sourceTable).operations().current().metadataFileLocation();
+ shell.executeStatement("DROP TABLE " + sourceIdentifier.name());
+ TableIdentifier targetIdentifier = TableIdentifier.of("default", "target");
+ Table targetTable =
+ testTables.createTable(shell, targetIdentifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ PartitionSpec.unpartitioned(), FileFormat.PARQUET,
Collections.emptyList(), 1,
+ ImmutableMap.<String, String>builder().put("metadata_location",
metadataLocation).build()
+ );
+ Assert.assertEquals(metadataLocation, ((BaseTable)
targetTable).operations().current().metadataFileLocation());
+ List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
targetIdentifier.name());
+ List<Record> records =
HiveIcebergTestUtils.valueForRow(targetTable.schema(), rows);
+
HiveIcebergTestUtils.validateData(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
records, 0);
+ // append a second set of data to the target table
+ testTables.appendIcebergTable(shell.getHiveConf(), targetTable,
FileFormat.PARQUET, null,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ rows = shell.executeStatement("SELECT * FROM " + targetIdentifier.name());
+ records = HiveIcebergTestUtils.valueForRow(targetTable.schema(), rows);
+ HiveIcebergTestUtils.validateData(
+
Stream.concat(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.stream(),
+
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.stream()).collect(Collectors.toList()),
records, 0);
+ }
+
/**
* Checks that the new schema has newintcol and newstring col columns on
both HMS and Iceberg sides
* @throws Exception - any test error
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index fc0f0ca558..4bf7f88652 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -170,7 +170,25 @@ abstract class TestTables {
}
/**
- * Creates an non partitioned Hive test table. Creates the Iceberg
table/data and creates the corresponding Hive
+ * Creates a non partitioned Hive test table. Creates the Iceberg table/data
and creates the corresponding Hive
+ * table as well when needed. The table will be in the 'default' database.
The table will be populated with the
+ * provided List of {@link Record}s.
+ * @param shell The HiveShell used for Hive table creation
+ * @param tableName The name of the test table
+ * @param schema The schema used for the table creation
+ * @param fileFormat The file format used for writing the data
+ * @param records The records with which the table is populated
+ * @param tblProperties Additional table properties
+ * @return The created table
+ * @throws IOException If there is an error writing data
+ */
+ public Table createTable(TestHiveShell shell, String tableName, Schema
schema, FileFormat fileFormat,
+ List<Record> records, Map<String, String> tblProperties) throws
IOException {
+ return createTable(shell, tableName, schema, fileFormat, records, 1,
tblProperties);
+ }
+
+ /**
+ * Creates a non partitioned Hive test table. Creates the Iceberg table/data
and creates the corresponding Hive
* table as well when needed. The table will be in the 'default' database.
The table will be populated with the
* provided List of {@link Record}s.
* @param shell The HiveShell used for Hive table creation
@@ -184,7 +202,27 @@ abstract class TestTables {
*/
public Table createTable(TestHiveShell shell, String tableName, Schema
schema, FileFormat fileFormat,
List<Record> records, int formatVersion) throws IOException {
- Map<String, String> tblProps =
ImmutableMap.of(TableProperties.FORMAT_VERSION,
Integer.toString(formatVersion));
+ return createTable(shell, tableName, schema, fileFormat, records,
formatVersion, Collections.emptyMap());
+ }
+
+ /**
+ * Creates a non partitioned Hive test table. Creates the Iceberg table/data
and creates the corresponding Hive
+ * table as well when needed. The table will be in the 'default' database.
The table will be populated with the
+ * provided List of {@link Record}s.
+ * @param shell The HiveShell used for Hive table creation
+ * @param tableName The name of the test table
+ * @param schema The schema used for the table creation
+ * @param fileFormat The file format used for writing the data
+ * @param records The records with which the table is populated
+ * @param formatVersion The version of the spec the table should use
(format-version)
+ * @param tblProperties Additional table properties
+ * @return The created table
+ * @throws IOException If there is an error writing data
+ */
+ public Table createTable(TestHiveShell shell, String tableName, Schema
schema, FileFormat fileFormat,
+ List<Record> records, int formatVersion, Map<String, String>
tblProperties) throws IOException {
+ ImmutableMap<String, String> tblProps = ImmutableMap.<String,
String>builder().putAll(tblProperties)
+ .put(TableProperties.FORMAT_VERSION,
Integer.toString(formatVersion)).build();
Table table = createIcebergTable(shell.getHiveConf(), tableName, schema,
fileFormat, tblProps, records);
String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default",
tableName), tblProps);
if (createHiveSQL != null) {
@@ -226,13 +264,32 @@ abstract class TestTables {
*/
public Table createTable(TestHiveShell shell, String tableName, Schema
schema, PartitionSpec spec,
FileFormat fileFormat, List<Record> records, Integer formatVersion) {
- TableIdentifier identifier = TableIdentifier.of("default", tableName);
- String tblProps = propertiesForCreateTableSQL(ImmutableMap.of(
- TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString(),
- InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema),
- InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec),
- TableProperties.FORMAT_VERSION, Integer.toString(formatVersion)));
+ return createTable(shell, tableName, schema, spec, fileFormat, records,
formatVersion, Collections.emptyMap());
+ }
+ /**
+ * Creates a partitioned Hive test table using Hive SQL. The table will be
in the 'default' database.
+ * The table will be populated with the provided List of {@link Record}s
using a Hive insert statement.
+ * @param shell The HiveShell used for Hive table creation
+ * @param tableName The name of the test table
+ * @param schema The schema used for the table creation
+ * @param spec The partition specification for the table
+ * @param fileFormat The file format used for writing the data
+ * @param records The records with which the table is populated
+ * @param formatVersion The version of the spec the table should use
(format-version)
+ * @param tblProperties Additional table properties
+ * @return The created table
+ * @throws IOException If there is an error writing data
+ */
+ public Table createTable(TestHiveShell shell, String tableName, Schema
schema, PartitionSpec spec,
+ FileFormat fileFormat, List<Record> records, Integer formatVersion,
Map<String, String> tblProperties) {
+ TableIdentifier identifier = TableIdentifier.of("default", tableName);
+ String tblProps = propertiesForCreateTableSQL(ImmutableMap.<String,
String>builder().putAll(tblProperties)
+ .put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString())
+ .put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema))
+ .put(InputFormatConfig.PARTITION_SPEC,
PartitionSpecParser.toJson(spec))
+ .put(TableProperties.FORMAT_VERSION, Integer.toString(formatVersion))
+ .build());
shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
" STORED BY ICEBERG " + locationForCreateTableSQL(identifier) +
tblProps);
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
index e8945b5dd0..b2e15ee9a6 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
@@ -167,4 +167,12 @@ public interface HiveMetaHook {
public default void preTruncateTable(Table table, EnvironmentContext
context) throws MetaException {
// Do nothing
}
+
+ /**
+ * Returns true if the HMS table should be created by the implementing class.
+ * @return
+ */
+ default boolean createHMSTableInHook() {
+ return false;
+ }
}
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 85ed8e39d3..651546caa5 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -1385,7 +1385,9 @@ public class HiveMetaStoreClient implements
IMetaStoreClient, AutoCloseable {
boolean success = false;
try {
// Subclasses can override this step (for example, for temporary tables)
- create_table(request);
+ if (hook == null || !hook.createHMSTableInHook()) {
+ create_table(request);
+ }
if (hook != null) {
hook.commitCreateTable(tbl);
}