This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c561a71d8 HIVE-26190: Implement create iceberg table with metadata 
location. (#3259) (Laszlo Pinter, reviewed by Peter Vary)
6c561a71d8 is described below

commit 6c561a71d84d90086650ddd7f7b532eae534fa0d
Author: László Pintér <[email protected]>
AuthorDate: Fri May 6 15:13:07 2022 +0200

    HIVE-26190: Implement create iceberg table with metadata location. (#3259) 
(Laszlo Pinter, reviewed by Peter Vary)
---
 .../main/java/org/apache/iceberg/mr/Catalogs.java  | 86 +++++++++++++++++-----
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       | 16 +++-
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  | 34 +++++++++
 .../org/apache/iceberg/mr/hive/TestTables.java     | 73 ++++++++++++++++--
 .../apache/hadoop/hive/metastore/HiveMetaHook.java |  8 ++
 .../hadoop/hive/metastore/HiveMetaStoreClient.java |  4 +-
 6 files changed, 194 insertions(+), 27 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index ccd3c6e196..d716e2d614 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -137,26 +137,12 @@ public final class Catalogs {
    * @return the created Iceberg table
    */
   public static Table createTable(Configuration conf, Properties props) {
-    String schemaString = props.getProperty(InputFormatConfig.TABLE_SCHEMA);
-    Preconditions.checkNotNull(schemaString, "Table schema not set");
-    Schema schema = 
SchemaParser.fromJson(props.getProperty(InputFormatConfig.TABLE_SCHEMA));
-
-    String specString = props.getProperty(InputFormatConfig.PARTITION_SPEC);
-    PartitionSpec spec = PartitionSpec.unpartitioned();
-    if (specString != null) {
-      spec = PartitionSpecParser.fromJson(schema, specString);
-    }
-
+    Schema schema = schema(props);
+    PartitionSpec spec = spec(props, schema);
     String location = props.getProperty(LOCATION);
     String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
 
-    // Create a table property map without the controlling properties
-    Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size());
-    for (Object key : props.keySet()) {
-      if (!PROPERTIES_TO_REMOVE.contains(key)) {
-        map.put(key.toString(), props.get(key).toString());
-      }
-    }
+    Map<String, String> map = filterIcebergTableProperties(props);
 
     Optional<Catalog> catalog = loadCatalog(conf, catalogName);
 
@@ -214,6 +200,31 @@ public final class Catalogs {
     return getCatalogProperties(conf, catalogName, 
catalogType).get(CatalogProperties.CATALOG_IMPL) == null;
   }
 
+  /**
+   * Register a table with the configured catalog if it does not exist.
+   * @param conf a Hadoop conf
+   * @param props the controlling properties
+   * @param metadataLocation the location of a metadata file
+   * @return the created Iceberg table
+   */
+  public static Table registerTable(Configuration conf, Properties props, 
String metadataLocation) {
+    Schema schema = schema(props);
+    PartitionSpec spec = spec(props, schema);
+    Map<String, String> map = filterIcebergTableProperties(props);
+    String location = props.getProperty(LOCATION);
+    String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
+
+    Optional<Catalog> catalog = loadCatalog(conf, catalogName);
+    if (catalog.isPresent()) {
+      String name = props.getProperty(NAME);
+      Preconditions.checkNotNull(name, "Table identifier not set");
+      return catalog.get().registerTable(TableIdentifier.parse(name), 
metadataLocation);
+    }
+
+    Preconditions.checkNotNull(location, "Table location not set");
+    return new HadoopTables(conf).create(schema, spec, map, location);
+  }
+
   @VisibleForTesting
   static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
     String catalogType = getCatalogType(conf, catalogName);
@@ -295,4 +306,45 @@ public final class Catalogs {
       }
     }
   }
+
+  /**
+   * Parse the table schema from the properties
+   * @param props the controlling properties
+   * @return schema instance
+   */
+  private static Schema schema(Properties props) {
+    String schemaString = props.getProperty(InputFormatConfig.TABLE_SCHEMA);
+    Preconditions.checkNotNull(schemaString, "Table schema not set");
+    return 
SchemaParser.fromJson(props.getProperty(InputFormatConfig.TABLE_SCHEMA));
+  }
+
+  /**
+   * Get the partition spec from the properties
+   * @param props the controlling properties
+   * @param schema instance of the iceberg schema
+   * @return  instance of the partition spec
+   */
+  private static PartitionSpec spec(Properties props, Schema schema) {
+    String specString = props.getProperty(InputFormatConfig.PARTITION_SPEC);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    if (specString != null) {
+      spec = PartitionSpecParser.fromJson(schema, specString);
+    }
+    return spec;
+  }
+
+  /**
+   * Create the iceberg table properties without the {@link 
Catalogs#PROPERTIES_TO_REMOVE}
+   * @param props the controlling properties
+   * @return map of iceberg table properties
+   */
+  private static Map<String, String> filterIcebergTableProperties(Properties 
props) {
+    Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size());
+    for (Object key : props.keySet()) {
+      if (!PROPERTIES_TO_REMOVE.contains(key)) {
+        map.put(key.toString(), props.get(key).toString());
+      }
+    }
+    return map;
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index cb7248029d..a27078581e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -130,6 +130,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   private UpdatePartitionSpec updatePartitionSpec;
   private Transaction transaction;
   private AlterTableType currentAlterTableOp;
+  private boolean createHMSTableInHook = false;
 
   public HiveIcebergMetaHook(Configuration conf) {
     this.conf = conf;
@@ -183,6 +184,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(schema));
     catalogProperties.put(InputFormatConfig.PARTITION_SPEC, 
PartitionSpecParser.toJson(spec));
     setCommonHmsTablePropertiesForIceberg(hmsTable);
+
+    if 
(hmsTable.getParameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP))
 {
+      createHMSTableInHook = true;
+    }
   }
 
   @Override
@@ -197,7 +202,12 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
         catalogProperties.put(TableProperties.ENGINE_HIVE_ENABLED, true);
       }
 
-      Catalogs.createTable(conf, catalogProperties);
+      String metadataLocation = 
hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+      if (metadataLocation != null) {
+        Catalogs.registerTable(conf, catalogProperties, metadataLocation);
+      } else {
+        Catalogs.createTable(conf, catalogProperties);
+      }
     }
   }
 
@@ -432,6 +442,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     context.putToProperties("truncateSkipDataDeletion", "true");
   }
 
+  @Override public boolean createHMSTableInHook() {
+    return createHMSTableInHook;
+  }
+
   private void alterTableProperties(org.apache.hadoop.hive.metastore.api.Table 
hmsTable,
       Map<String, String> contextProperties) {
     Map<String, String> hmsTableParameters = hmsTable.getParameters();
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index f020a2131e..31b2800d81 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -1422,6 +1423,39 @@ public class TestHiveIcebergStorageHandlerNoScan {
     Assert.assertEquals("iceberg://" + hmsTable.getDbName() + "/" + 
hmsTable.getTableName(), uriForAuth.toString());
   }
 
+  @Test
+  public void testCreateTableWithMetadataLocation() throws IOException {
+    Assume.assumeTrue("Create with metadata location is only supported for 
Hive Catalog tables",
+        testTableType.equals(TestTables.TestTableType.HIVE_CATALOG));
+    TableIdentifier sourceIdentifier = TableIdentifier.of("default", "source");
+    Table sourceTable =
+        testTables.createTable(shell, sourceIdentifier.name(), 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            PartitionSpec.unpartitioned(), FileFormat.PARQUET, 
Collections.emptyList(), 1,
+            ImmutableMap.<String, 
String>builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build());
+    testTables.appendIcebergTable(shell.getHiveConf(), sourceTable, 
FileFormat.PARQUET, null,
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    String metadataLocation = ((BaseTable) 
sourceTable).operations().current().metadataFileLocation();
+    shell.executeStatement("DROP TABLE " + sourceIdentifier.name());
+    TableIdentifier targetIdentifier = TableIdentifier.of("default", "target");
+    Table targetTable =
+        testTables.createTable(shell, targetIdentifier.name(), 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            PartitionSpec.unpartitioned(), FileFormat.PARQUET, 
Collections.emptyList(), 1,
+            ImmutableMap.<String, String>builder().put("metadata_location", 
metadataLocation).build()
+        );
+    Assert.assertEquals(metadataLocation, ((BaseTable) 
targetTable).operations().current().metadataFileLocation());
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " + 
targetIdentifier.name());
+    List<Record> records = 
HiveIcebergTestUtils.valueForRow(targetTable.schema(), rows);
+    
HiveIcebergTestUtils.validateData(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
 records, 0);
+    // append a second set of data to the target table
+    testTables.appendIcebergTable(shell.getHiveConf(), targetTable, 
FileFormat.PARQUET, null,
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    rows = shell.executeStatement("SELECT * FROM " + targetIdentifier.name());
+    records = HiveIcebergTestUtils.valueForRow(targetTable.schema(), rows);
+    HiveIcebergTestUtils.validateData(
+        
Stream.concat(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.stream(),
+            
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.stream()).collect(Collectors.toList()),
 records, 0);
+  }
+
   /**
    * Checks that the new schema has newintcol and newstring col columns on 
both HMS and Iceberg sides
    * @throws Exception - any test error
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index fc0f0ca558..4bf7f88652 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -170,7 +170,25 @@ abstract class TestTables {
   }
 
   /**
-   * Creates an non partitioned Hive test table. Creates the Iceberg 
table/data and creates the corresponding Hive
+   * Creates a non partitioned Hive test table. Creates the Iceberg table/data 
and creates the corresponding Hive
+   * table as well when needed. The table will be in the 'default' database. 
The table will be populated with the
+   * provided List of {@link Record}s.
+   * @param shell The HiveShell used for Hive table creation
+   * @param tableName The name of the test table
+   * @param schema The schema used for the table creation
+   * @param fileFormat The file format used for writing the data
+   * @param records The records with which the table is populated
+   * @param tblProperties Additional table properties
+   * @return The created table
+   * @throws IOException If there is an error writing data
+   */
+  public Table createTable(TestHiveShell shell, String tableName, Schema 
schema, FileFormat fileFormat,
+      List<Record> records, Map<String, String> tblProperties) throws 
IOException {
+    return createTable(shell, tableName, schema, fileFormat, records, 1, 
tblProperties);
+  }
+
+  /**
+   * Creates a non partitioned Hive test table. Creates the Iceberg table/data 
and creates the corresponding Hive
    * table as well when needed. The table will be in the 'default' database. 
The table will be populated with the
    * provided List of {@link Record}s.
    * @param shell The HiveShell used for Hive table creation
@@ -184,7 +202,27 @@ abstract class TestTables {
    */
   public Table createTable(TestHiveShell shell, String tableName, Schema 
schema, FileFormat fileFormat,
       List<Record> records, int formatVersion) throws IOException {
-    Map<String, String> tblProps = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, 
Integer.toString(formatVersion));
+    return createTable(shell, tableName, schema, fileFormat, records, 
formatVersion, Collections.emptyMap());
+  }
+
+  /**
+   * Creates a non partitioned Hive test table. Creates the Iceberg table/data 
and creates the corresponding Hive
+   * table as well when needed. The table will be in the 'default' database. 
The table will be populated with the
+   * provided List of {@link Record}s.
+   * @param shell The HiveShell used for Hive table creation
+   * @param tableName The name of the test table
+   * @param schema The schema used for the table creation
+   * @param fileFormat The file format used for writing the data
+   * @param records The records with which the table is populated
+   * @param formatVersion The version of the spec the table should use 
(format-version)
+   * @param tblProperties Additional table properties
+   * @return The created table
+   * @throws IOException If there is an error writing data
+   */
+  public Table createTable(TestHiveShell shell, String tableName, Schema 
schema, FileFormat fileFormat,
+      List<Record> records, int formatVersion, Map<String, String> 
tblProperties) throws IOException {
+    ImmutableMap<String, String> tblProps = ImmutableMap.<String, 
String>builder().putAll(tblProperties)
+        .put(TableProperties.FORMAT_VERSION, 
Integer.toString(formatVersion)).build();
     Table table = createIcebergTable(shell.getHiveConf(), tableName, schema, 
fileFormat, tblProps, records);
     String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", 
tableName), tblProps);
     if (createHiveSQL != null) {
@@ -226,13 +264,32 @@ abstract class TestTables {
    */
   public Table createTable(TestHiveShell shell, String tableName, Schema 
schema, PartitionSpec spec,
       FileFormat fileFormat, List<Record> records, Integer formatVersion)  {
-    TableIdentifier identifier = TableIdentifier.of("default", tableName);
-    String tblProps = propertiesForCreateTableSQL(ImmutableMap.of(
-        TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString(),
-        InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema),
-        InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec),
-        TableProperties.FORMAT_VERSION, Integer.toString(formatVersion)));
+    return createTable(shell, tableName, schema, spec, fileFormat, records, 
formatVersion, Collections.emptyMap());
+  }
 
+  /**
+   * Creates a partitioned Hive test table using Hive SQL. The table will be 
in the 'default' database.
+   * The table will be populated with the provided List of {@link Record}s 
using a Hive insert statement.
+   * @param shell The HiveShell used for Hive table creation
+   * @param tableName The name of the test table
+   * @param schema The schema used for the table creation
+   * @param spec The partition specification for the table
+   * @param fileFormat The file format used for writing the data
+   * @param records The records with which the table is populated
+   * @param formatVersion The version of the spec the table should use 
(format-version)
+   * @param tblProperties Additional table properties
+   * @return The created table
+   * @throws IOException If there is an error writing data
+   */
+  public Table createTable(TestHiveShell shell, String tableName, Schema 
schema, PartitionSpec spec,
+      FileFormat fileFormat, List<Record> records, Integer formatVersion, 
Map<String, String> tblProperties) {
+    TableIdentifier identifier = TableIdentifier.of("default", tableName);
+    String tblProps = propertiesForCreateTableSQL(ImmutableMap.<String, 
String>builder().putAll(tblProperties)
+        .put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString())
+        .put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema))
+        .put(InputFormatConfig.PARTITION_SPEC, 
PartitionSpecParser.toJson(spec))
+        .put(TableProperties.FORMAT_VERSION, Integer.toString(formatVersion))
+        .build());
     shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
         " STORED BY ICEBERG " + locationForCreateTableSQL(identifier) + 
tblProps);
 
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
index e8945b5dd0..b2e15ee9a6 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
@@ -167,4 +167,12 @@ public interface HiveMetaHook {
   public default void preTruncateTable(Table table, EnvironmentContext 
context) throws MetaException {
     // Do nothing
   }
+
+  /**
+   * Returns true if the HMS table should be created by the implementing class.
+   * @return
+   */
+  default boolean createHMSTableInHook() {
+    return false;
+  }
 }
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 85ed8e39d3..651546caa5 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -1385,7 +1385,9 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient, AutoCloseable {
     boolean success = false;
     try {
       // Subclasses can override this step (for example, for temporary tables)
-      create_table(request);
+      if (hook == null || !hook.createHMSTableInHook()) {
+        create_table(request);
+      }
       if (hook != null) {
         hook.commitCreateTable(tbl);
       }

Reply via email to