pvary commented on a change in pull request #1495:
URL: https://github.com/apache/iceberg/pull/1495#discussion_r494088866



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -27,21 +27,32 @@
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
-import org.apache.iceberg.Table;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
 import org.apache.iceberg.mr.mapred.Container;
 
 public class HiveIcebergSerDe extends AbstractSerDe {
-
+  private Schema schema;
   private ObjectInspector inspector;
 
   @Override
   public void initialize(@Nullable Configuration configuration, Properties 
serDeProperties) throws SerDeException {
-    Table table = Catalogs.loadTable(configuration, serDeProperties);
-
     try {
-      this.inspector = IcebergObjectInspector.create(table.schema());
+      String schemaString = (String) 
serDeProperties.get(InputFormatConfig.TABLE_SCHEMA);
+      if (schemaString != null) {
+        schema = SchemaParser.fromJson(schemaString);
+      } else {
+        try {
+          schema = Catalogs.loadTable(configuration, serDeProperties).schema();
+        } catch (NoSuchTableException nte) {
+          throw new SerDeException("Please provide an existing table or a 
valid schema", nte);
+        }
+      }
+      inspector = IcebergObjectInspector.create(schema);

Review comment:
       Added a comment:
   
   HiveIcebergSerDe.initialize is called multiple places in Hive code:
   - When we are trying to create a table - HiveDDL data is stored at the 
serDeProperties, but no Iceberg table is created yet.
   - When we are compiling the Hive query on HiveServer2 side - We only have 
table information (location/name), and we have to read the schema using the 
table data. This is called multiple times so there is room for optimizing here.
   - When we are executing the Hive query in the execution engine - We do not 
want to load the table data on every executor, but serDeProperties are 
populated by HiveIcebergStorageHandler.configureInputJobProperties() and the 
resulting properties are serialized and distributed to the executors
   

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
+  private static final Set<String> PARAMETERS_TO_REMOVE = Stream
+      .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, 
Catalogs.LOCATION, Catalogs.NAME)
+      .collect(Collectors.toCollection(HashSet::new));
+  private static final Set<String> PROPERTIES_TO_REMOVE = Stream
+      .of(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, "storage_handler", 
"EXTERNAL")
+      .collect(Collectors.toCollection(HashSet::new));
+
+  private final Configuration conf;
+  private Table icebergTable = null;
+  private Properties catalogProperties;
+  private boolean deleteIcebergTable;
+
+  public HiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    catalogProperties = getCatalogProperties(hmsTable);

Review comment:
       You have already mentioned once. Will not forget next time. Sorry.
   Done

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
+  private static final Set<String> PARAMETERS_TO_REMOVE = Stream
+      .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, 
Catalogs.LOCATION, Catalogs.NAME)
+      .collect(Collectors.toCollection(HashSet::new));
+  private static final Set<String> PROPERTIES_TO_REMOVE = Stream
+      .of(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, "storage_handler", 
"EXTERNAL")
+      .collect(Collectors.toCollection(HashSet::new));
+
+  private final Configuration conf;
+  private Table icebergTable = null;
+  private Properties catalogProperties;
+  private boolean deleteIcebergTable;
+
+  public HiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    catalogProperties = getCatalogProperties(hmsTable);
+    try {
+      icebergTable = Catalogs.loadTable(conf, catalogProperties);
+
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+          "Iceberg table already created - can not use provided schema");
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+          "Iceberg table already created - can not use provided partition 
specification");
+
+      LOG.info("Iceberg table already exists {}", icebergTable);
+    } catch (NoSuchTableException nte) {
+      String schemaString = 
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
+      Preconditions.checkNotNull(schemaString, "Please provide a table 
schema");
+      // Just check if it is parsable, and later use for partition 
specification parsing
+      Schema schema = SchemaParser.fromJson(schemaString);
+
+      String specString = 
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
+      if (specString != null) {
+        // Just check if it is parsable
+        PartitionSpecParser.fromJson(schema, schemaString);
+      }
+
+      // Allow purging table data if the table is created now and not set 
otherwise
+      if 
(hmsTable.getParameters().get(InputFormatConfig.HIVE_DELETE_BACKING_TABLE) == 
null) {
+        
hmsTable.getParameters().put(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, 
"TRUE");
+      }
+
+      // Set the table type even for non HiveCatalog based tables
+      
hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+          BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
+
+      // Remove creation related properties
+      PARAMETERS_TO_REMOVE.forEach(hmsTable.getParameters()::remove);
+    }
+  }
+
+  @Override
+  public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    if (icebergTable == null) {
+      catalogProperties.put(HiveTableOperations.TABLE_FROM_HIVE, true);
+      LOG.info("Iceberg table creation with the following properties {}", 
catalogProperties.keySet());

Review comment:
       Removed

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
+  private static final Set<String> PARAMETERS_TO_REMOVE = Stream
+      .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, 
Catalogs.LOCATION, Catalogs.NAME)
+      .collect(Collectors.toCollection(HashSet::new));
+  private static final Set<String> PROPERTIES_TO_REMOVE = Stream
+      .of(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, "storage_handler", 
"EXTERNAL")
+      .collect(Collectors.toCollection(HashSet::new));
+
+  private final Configuration conf;
+  private Table icebergTable = null;
+  private Properties catalogProperties;
+  private boolean deleteIcebergTable;
+
+  public HiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    catalogProperties = getCatalogProperties(hmsTable);
+    try {
+      icebergTable = Catalogs.loadTable(conf, catalogProperties);
+
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+          "Iceberg table already created - can not use provided schema");
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+          "Iceberg table already created - can not use provided partition 
specification");
+
+      LOG.info("Iceberg table already exists {}", icebergTable);
+    } catch (NoSuchTableException nte) {
+      String schemaString = 
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
+      Preconditions.checkNotNull(schemaString, "Please provide a table 
schema");
+      // Just check if it is parsable, and later use for partition 
specification parsing
+      Schema schema = SchemaParser.fromJson(schemaString);
+
+      String specString = 
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
+      if (specString != null) {
+        // Just check if it is parsable
+        PartitionSpecParser.fromJson(schema, schemaString);
+      }
+
+      // Allow purging table data if the table is created now and not set 
otherwise
+      if 
(hmsTable.getParameters().get(InputFormatConfig.HIVE_DELETE_BACKING_TABLE) == 
null) {
+        
hmsTable.getParameters().put(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, 
"TRUE");

Review comment:
       I can see 2 different use-cases for Hive tables Iceberg tables:
   1. Table is created from Hive. When we drop the table we want to drop the 
underlying Iceberg table as well
   2. Table is created outside of Hive but want to read it from Hive as well. 
We create the Hive table above the Iceberg table but when we drop the table we 
do not want to drop the underlying Iceberg table.
   
   The proposed solution is:
   - Default behavior: When the table is created from Hive and it needs to 
create the underlying Iceberg table then Hive will master the data. If the 
underlying Iceberg table is already there then Hive will not master the data. 
When the Hive table is dropped then the underlying Iceberg table is only 
dropped if Hive is mastering the data.
   - HIVE_DELETE_BACKING_TABLE could override the default behavior when 
dropping the table
   
   There is 1 exception: If the HiveCatalog is used it is obviously not 
possible to keep the table if we drop the HMS table. That is why this check is 
needed.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
+  private static final Set<String> PARAMETERS_TO_REMOVE = Stream
+      .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, 
Catalogs.LOCATION, Catalogs.NAME)
+      .collect(Collectors.toCollection(HashSet::new));
+  private static final Set<String> PROPERTIES_TO_REMOVE = Stream
+      .of(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, "storage_handler", 
"EXTERNAL")
+      .collect(Collectors.toCollection(HashSet::new));
+
+  private final Configuration conf;
+  private Table icebergTable = null;
+  private Properties catalogProperties;
+  private boolean deleteIcebergTable;
+
+  public HiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    catalogProperties = getCatalogProperties(hmsTable);
+    try {
+      icebergTable = Catalogs.loadTable(conf, catalogProperties);
+
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+          "Iceberg table already created - can not use provided schema");
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+          "Iceberg table already created - can not use provided partition 
specification");
+
+      LOG.info("Iceberg table already exists {}", icebergTable);
+    } catch (NoSuchTableException nte) {
+      String schemaString = 
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
+      Preconditions.checkNotNull(schemaString, "Please provide a table 
schema");
+      // Just check if it is parsable, and later use for partition 
specification parsing
+      Schema schema = SchemaParser.fromJson(schemaString);
+
+      String specString = 
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
+      if (specString != null) {
+        // Just check if it is parsable
+        PartitionSpecParser.fromJson(schema, schemaString);
+      }
+
+      // Allow purging table data if the table is created now and not set 
otherwise
+      if 
(hmsTable.getParameters().get(InputFormatConfig.HIVE_DELETE_BACKING_TABLE) == 
null) {
+        
hmsTable.getParameters().put(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, 
"TRUE");

Review comment:
       I can see 2 different use-cases for Hive tables Iceberg tables:
   1. Table is created and used only from Hive. When we drop the table we want 
to drop the underlying Iceberg table as well
   2. Table is created outside of Hive but want to read it from Hive as well. 
We create the Hive table above the Iceberg table but when we drop the table we 
do not want to drop the underlying Iceberg table.
   
   The proposed solution is:
   - Default behavior: When the table is created from Hive and it needs to 
create the underlying Iceberg table then Hive will master the data. If the 
underlying Iceberg table is already there then Hive will not master the data. 
When the Hive table is dropped then the underlying Iceberg table is only 
dropped if Hive is mastering the data.
   - HIVE_DELETE_BACKING_TABLE could override the default behavior when 
dropping the table
   
   There is 1 exception: If the HiveCatalog is used it is obviously not 
possible to keep the table if we drop the HMS table. That is why this check is 
needed.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
+  private static final Set<String> PARAMETERS_TO_REMOVE = Stream
+      .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, 
Catalogs.LOCATION, Catalogs.NAME)
+      .collect(Collectors.toCollection(HashSet::new));
+  private static final Set<String> PROPERTIES_TO_REMOVE = Stream
+      .of(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, "storage_handler", 
"EXTERNAL")
+      .collect(Collectors.toCollection(HashSet::new));
+
+  private final Configuration conf;
+  private Table icebergTable = null;
+  private Properties catalogProperties;
+  private boolean deleteIcebergTable;
+
+  public HiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    catalogProperties = getCatalogProperties(hmsTable);
+    try {
+      icebergTable = Catalogs.loadTable(conf, catalogProperties);
+
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+          "Iceberg table already created - can not use provided schema");
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+          "Iceberg table already created - can not use provided partition 
specification");
+
+      LOG.info("Iceberg table already exists {}", icebergTable);
+    } catch (NoSuchTableException nte) {
+      String schemaString = 
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
+      Preconditions.checkNotNull(schemaString, "Please provide a table 
schema");
+      // Just check if it is parsable, and later use for partition 
specification parsing
+      Schema schema = SchemaParser.fromJson(schemaString);
+
+      String specString = 
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
+      if (specString != null) {
+        // Just check if it is parsable
+        PartitionSpecParser.fromJson(schema, schemaString);
+      }
+
+      // Allow purging table data if the table is created now and not set 
otherwise
+      if 
(hmsTable.getParameters().get(InputFormatConfig.HIVE_DELETE_BACKING_TABLE) == 
null) {
+        
hmsTable.getParameters().put(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, 
"TRUE");

Review comment:
       I can see 2 different use-cases for Hive tables Iceberg tables:
   1. Table is created and used only from Hive. When we drop the table we want 
to drop the underlying Iceberg table as well
   2. Table is created outside of Hive but want to read it from Hive as well. 
We create the Hive table above the Iceberg table but when we drop the Hive 
table we do not want to drop the underlying Iceberg table.
   
   The proposed solution is:
   - Default behavior: When the table is created from Hive and it needs to 
create the underlying Iceberg table then Hive will master the data. If the 
underlying Iceberg table is already there then Hive will not master the data. 
When the Hive table is dropped then the underlying Iceberg table is only 
dropped if Hive is mastering the data.
   - HIVE_DELETE_BACKING_TABLE could override the default behavior when 
dropping the table
   
   There is 1 exception: If the HiveCatalog is used it is obviously not 
possible to keep the table if we drop the HMS table. That is why this check is 
needed.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
+  private static final Set<String> PARAMETERS_TO_REMOVE = Stream
+      .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, 
Catalogs.LOCATION, Catalogs.NAME)
+      .collect(Collectors.toCollection(HashSet::new));
+  private static final Set<String> PROPERTIES_TO_REMOVE = Stream
+      .of(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, "storage_handler", 
"EXTERNAL")
+      .collect(Collectors.toCollection(HashSet::new));
+
+  private final Configuration conf;
+  private Table icebergTable = null;
+  private Properties catalogProperties;
+  private boolean deleteIcebergTable;
+
+  public HiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    catalogProperties = getCatalogProperties(hmsTable);
+    try {
+      icebergTable = Catalogs.loadTable(conf, catalogProperties);
+
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+          "Iceberg table already created - can not use provided schema");
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+          "Iceberg table already created - can not use provided partition 
specification");
+
+      LOG.info("Iceberg table already exists {}", icebergTable);
+    } catch (NoSuchTableException nte) {
+      String schemaString = 
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
+      Preconditions.checkNotNull(schemaString, "Please provide a table 
schema");
+      // Just check if it is parsable, and later use for partition 
specification parsing
+      Schema schema = SchemaParser.fromJson(schemaString);
+
+      String specString = 
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
+      if (specString != null) {
+        // Just check if it is parsable
+        PartitionSpecParser.fromJson(schema, schemaString);
+      }
+
+      // Allow purging table data if the table is created now and not set 
otherwise
+      if 
(hmsTable.getParameters().get(InputFormatConfig.HIVE_DELETE_BACKING_TABLE) == 
null) {
+        
hmsTable.getParameters().put(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, 
"TRUE");
+      }
+
+      // Set the table type even for non HiveCatalog based tables
+      
hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+          BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
+
+      // Remove creation related properties
+      PARAMETERS_TO_REMOVE.forEach(hmsTable.getParameters()::remove);
+    }
+  }
+
+  @Override
+  public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    if (icebergTable == null) {
+      catalogProperties.put(HiveTableOperations.TABLE_FROM_HIVE, true);
+      LOG.info("Iceberg table creation with the following properties {}", 
catalogProperties.keySet());
+      Catalogs.createTable(conf, catalogProperties);
+    }
+  }
+
+  @Override
+  public void preDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) throws MetaException {
+    catalogProperties = getCatalogProperties(hmsTable);
+    deleteIcebergTable = hmsTable.getParameters() != null &&
+        
"TRUE".equalsIgnoreCase(hmsTable.getParameters().get(InputFormatConfig.HIVE_DELETE_BACKING_TABLE));
+
+    if (!deleteIcebergTable) {
+      if (!Catalogs.canWorkWithoutHive(conf)) {
+        // This should happen only if someone were manually removing this 
property from the table, or
+        // added the table from outside of Hive
+        throw new MetaException("Can not drop Hive table and keep Iceberg 
table data when using HiveCatalog. " +
+            "Please add " + InputFormatConfig.HIVE_DELETE_BACKING_TABLE + 
"='TRUE' to TBLPROPERTIES " +
+            "of the Hive table to enable dropping");
+      }
+    }
+  }
+
+  @Override
+  public void rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, boolean deleteData) {
+    if (deleteData && deleteIcebergTable) {
+      LOG.info("Dropping with purge all the data for table {}.{}", 
hmsTable.getDbName(), hmsTable.getTableName());
+      Catalogs.dropTable(conf, catalogProperties);
+    }
+  }
+
+  private Properties 
getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    Properties properties = new Properties();
+    properties.putAll(hmsTable.getParameters());
+
+    if (properties.get(Catalogs.LOCATION) == null &&
+        hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null) {
+      properties.put(Catalogs.LOCATION, hmsTable.getSd().getLocation());
+    }
+
+    if (properties.get(Catalogs.NAME) == null) {
+      properties.put(Catalogs.NAME, hmsTable.getDbName() + "." + 
hmsTable.getTableName());
+    }

Review comment:
       Added a comment, and modified a bit to use TableIdentifier instead of 
concatenating the strings

##########
File path: 
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -139,6 +140,8 @@ protected void doRefresh() {
 
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    boolean updateTable = base != null || 
metadata.propertyAsBoolean(TABLE_FROM_HIVE, false);

Review comment:
       If we do not want to use properties for signaling this behavior (looking 
at it again with fresh eyes seems like a hack), then we have to modify:
   - add method TableBuilder.updateExisting
   - add method TableOperations.commit with new parameter signaling update
   - add method HiveTableOperations/HadoopTableOperations to signal the update
   
   Or we have to resort to parameters sending/hacking through some level and 
removing the parameter from the lower levels.
   
   I do not like any of these solutions, so happy to go with any other viable 
solution...

##########
File path: 
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -139,6 +140,8 @@ protected void doRefresh() {
 
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    boolean updateTable = base != null || 
metadata.propertyAsBoolean(TABLE_FROM_HIVE, false);

Review comment:
       Yes, you are right.
   
   With the HiveMetaHooks we have the possibility to do some `preCreate` and 
`commitCreate` stuff, but the table should be created by the HMS between these 
phases. I have even thought about dropping the HMS created table in the 
`commitCreate` method but that seems like a serious waste of effort, and also I 
do not have a HMS client at hand.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
+  private static final Set<String> PARAMETERS_TO_REMOVE = Stream
+      .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, 
Catalogs.LOCATION, Catalogs.NAME)
+      .collect(Collectors.toCollection(HashSet::new));
+  private static final Set<String> PROPERTIES_TO_REMOVE = Stream
+      .of(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, "storage_handler", 
"EXTERNAL")
+      .collect(Collectors.toCollection(HashSet::new));
+
+  private final Configuration conf;
+  private Table icebergTable = null;
+  private Properties catalogProperties;
+  private boolean deleteIcebergTable;
+
+  public HiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    catalogProperties = getCatalogProperties(hmsTable);
+    try {
+      icebergTable = Catalogs.loadTable(conf, catalogProperties);
+
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+          "Iceberg table already created - can not use provided schema");
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+          "Iceberg table already created - can not use provided partition 
specification");
+
+      LOG.info("Iceberg table already exists {}", icebergTable);
+    } catch (NoSuchTableException nte) {
+      String schemaString = 
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
+      Preconditions.checkNotNull(schemaString, "Please provide a table 
schema");
+      // Just check if it is parsable, and later use for partition 
specification parsing
+      Schema schema = SchemaParser.fromJson(schemaString);
+
+      String specString = 
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
+      if (specString != null) {
+        // Just check if it is parsable
+        PartitionSpecParser.fromJson(schema, schemaString);
+      }
+
+      // Allow purging table data if the table is created now and not set 
otherwise
+      if 
(hmsTable.getParameters().get(InputFormatConfig.HIVE_DELETE_BACKING_TABLE) == 
null) {
+        
hmsTable.getParameters().put(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, 
"TRUE");

Review comment:
       The `EXTERNAL`, `MANAGED` definition shifted a little bit in Hive lately.
   Previously the only real difference was that when the table was dropped, the 
data of `EXTERNAL` tables was not removed, while the data of the `MANAGED` 
tables were cleaned up. Since Hive 3 `MANAGED` means "do not touch my 
directories because there will be a data loss", and even for `EXTERNAL` tables 
we have a new property `external.table.purge`, which means that the data should 
be cleaned up if the table is dropped.

##########
File path: 
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -139,6 +140,8 @@ protected void doRefresh() {
 
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    boolean updateTable = base != null || 
metadata.propertyAsBoolean(TABLE_FROM_HIVE, false);

Review comment:
       This would be a slight modification to the API.
   - Before the change we throw a NoSuchIcebergTableException if we tried to 
load a Hive table which did not have the correct table type and the metadata 
location set.
   - After the change we will try to update the table with the correct 
parameters
   
   Would this be acceptable change in the API behavior?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergMetaHook.class);
+  private static final Set<String> PARAMETERS_TO_REMOVE = Stream
+      .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, 
Catalogs.LOCATION, Catalogs.NAME)
+      .collect(Collectors.toCollection(HashSet::new));
+  private static final Set<String> PROPERTIES_TO_REMOVE = Stream
+      .of(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, "storage_handler", 
"EXTERNAL")
+      .collect(Collectors.toCollection(HashSet::new));
+
+  private final Configuration conf;
+  private Table icebergTable = null;
+  private Properties catalogProperties;
+  private boolean deleteIcebergTable;
+
+  public HiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    catalogProperties = getCatalogProperties(hmsTable);
+    try {
+      icebergTable = Catalogs.loadTable(conf, catalogProperties);
+
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+          "Iceberg table already created - can not use provided schema");
+      
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+          "Iceberg table already created - can not use provided partition 
specification");
+
+      LOG.info("Iceberg table already exists {}", icebergTable);
+    } catch (NoSuchTableException nte) {
+      String schemaString = 
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
+      Preconditions.checkNotNull(schemaString, "Please provide a table 
schema");
+      // Just check if it is parsable, and later use for partition 
specification parsing
+      Schema schema = SchemaParser.fromJson(schemaString);
+
+      String specString = 
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
+      if (specString != null) {
+        // Just check if it is parsable
+        PartitionSpecParser.fromJson(schema, schemaString);
+      }
+
+      // Allow purging table data if the table is created now and not set 
otherwise
+      if 
(hmsTable.getParameters().get(InputFormatConfig.HIVE_DELETE_BACKING_TABLE) == 
null) {
+        
hmsTable.getParameters().put(InputFormatConfig.HIVE_DELETE_BACKING_TABLE, 
"TRUE");

Review comment:
       Maybe this doc can help understand the new `MANAGED` tables: 
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/using-hiveql/content/hive_hive_3_tables.html




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to