This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 43b1525f96b HIVE-26771: Use DDLTask to created Iceberg table when 
running ctas statement (Krisztian Kasa, reviewed by Denys Kuzmenko)
43b1525f96b is described below

commit 43b1525f96bece0bf46dfab8e73d0e1b51093555
Author: Krisztian Kasa <[email protected]>
AuthorDate: Wed Nov 30 15:59:45 2022 +0100

    HIVE-26771: Use DDLTask to created Iceberg table when running ctas 
statement (Krisztian Kasa, reviewed by Denys Kuzmenko)
    
    Closes #3802
---
 .../org/apache/hadoop/hive/conf/Constants.java     |   2 -
 .../main/java/org/apache/iceberg/mr/Catalogs.java  |   2 -
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       |  16 +-
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   |  74 +--------
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  30 +++-
 .../org/apache/iceberg/mr/hive/HiveTableUtil.java  |  67 ++++++++
 .../iceberg/mr/hive/TestHiveIcebergCTAS.java       |   8 +-
 .../src/test/queries/positive/ctas_iceberg_orc.q   |  15 ++
 .../test/results/positive/ctas_iceberg_orc.q.out   | 174 +++++++++++++++++++++
 .../positive/ctas_iceberg_partitioned_orc.q.out    |  32 +++-
 .../hive/ql/ddl/table/create/CreateTableDesc.java  |   3 -
 .../org/apache/hadoop/hive/ql/exec/MoveTask.java   |   5 +
 .../hive/ql/metadata/HiveStorageHandler.java       |  12 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |  35 ++---
 .../apache/hadoop/hive/ql/parse/TaskCompiler.java  |  26 ++-
 .../org/apache/hadoop/hive/ql/plan/PlanUtils.java  |   3 +-
 .../org/apache/hadoop/hive/ql/plan/TableDesc.java  |   8 +-
 17 files changed, 377 insertions(+), 135 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java 
b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 0ebb578d10b..d39c671cac4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -105,6 +105,4 @@ public class Constants {
 
   public static final String HTTP_HEADER_REQUEST_TRACK = "X-Request-ID";
   public static final String TIME_POSTFIX_REQUEST_TRACK = "_TIME";
-
-  public static final String EXPLAIN_CTAS_LOCATION = "explainCtasLocation";
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index d716e2d6148..d77ede333d1 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopTables;
-import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -225,7 +224,6 @@ public final class Catalogs {
     return new HadoopTables(conf).create(schema, spec, map, location);
   }
 
-  @VisibleForTesting
   static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
     String catalogType = getCatalogType(conf, catalogName);
     if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 7ce40fa00a7..53bbb7d149e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -239,11 +239,23 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       
setFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
 
       String metadataLocation = 
hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+      Table table;
       if (metadataLocation != null) {
-        Catalogs.registerTable(conf, catalogProperties, metadataLocation);
+        table = Catalogs.registerTable(conf, catalogProperties, 
metadataLocation);
       } else {
-        Catalogs.createTable(conf, catalogProperties);
+        table = Catalogs.createTable(conf, catalogProperties);
       }
+
+      if (!HiveTableUtil.isCtas(catalogProperties)) {
+        return;
+      }
+
+      // set this in the query state so that we can rollback the table in the 
lifecycle hook in case of failures
+      String tableIdentifier = catalogProperties.getProperty(Catalogs.NAME);
+      SessionStateUtil.addResource(conf, InputFormatConfig.CTAS_TABLE_NAME, 
tableIdentifier);
+      SessionStateUtil.addResource(conf, tableIdentifier, table);
+
+      HiveTableUtil.createFileForTableObject(table, conf);
     }
   }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 6a6aa0884f0..681599d39a2 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -25,17 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import javax.annotation.Nullable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
 import 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
-import org.apache.hadoop.hive.ql.session.SessionStateUtil;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -50,7 +44,6 @@ import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hive.HiveSchemaUtil;
@@ -120,16 +113,11 @@ public class HiveIcebergSerDe extends AbstractSerDe {
         this.tableSchema = hiveSchemaOrThrow(e, autoConversion);
         // This is only for table creation, it is ok to have an empty 
partition column list
         this.partitionColumns = ImmutableList.of();
-        // create table for CTAS
-        if (e instanceof NoSuchTableException &&
-                
Boolean.parseBoolean(serDeProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTAS)))
 {
-          if (!Catalogs.hiveCatalog(configuration, serDeProperties)) {
-            throw new SerDeException(CTAS_EXCEPTION_MSG);
-          }
 
-          if (!serDeProperties.containsKey(Constants.EXPLAIN_CTAS_LOCATION)) {
-            createTableForCTAS(configuration, serDeProperties);
-          }
+        if (e instanceof NoSuchTableException &&
+            HiveTableUtil.isCtas(serDeProperties) &&
+            !Catalogs.hiveCatalog(configuration, serDeProperties)) {
+          throw new SerDeException(CTAS_EXCEPTION_MSG);
         }
       }
     }
@@ -182,60 +170,6 @@ public class HiveIcebergSerDe extends AbstractSerDe {
     }
   }
 
-  private void createTableForCTAS(Configuration configuration, Properties 
serDeProperties) {
-    serDeProperties.setProperty(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(tableSchema));
-
-    // Spec for the PARTITIONED BY SPEC queries (partition stored in the 
SessionState)
-    PartitionSpec spec = IcebergTableUtil.spec(configuration, tableSchema);
-    if (spec == null && !getPartitionColumnNames().isEmpty()) {
-      // Spec for the PARTITIONED BY queries (partitioned columns created by 
the compiler)
-      List<FieldSchema> partitionFields = IntStream.range(0, 
getPartitionColumnNames().size())
-          .mapToObj(i ->
-               new FieldSchema(getPartitionColumnNames().get(i), 
getPartitionColumnTypes().get(i).getTypeName(), null))
-          .collect(Collectors.toList());
-      spec = HiveSchemaUtil.spec(tableSchema, partitionFields);
-    }
-
-    if (spec != null) {
-      serDeProperties.put(InputFormatConfig.PARTITION_SPEC, 
PartitionSpecParser.toJson(spec));
-    }
-
-    // clean up the properties for table creation (so that internal serde 
props don't become table props)
-    Properties createProps = getCTASTableCreationProperties(serDeProperties);
-
-    // create CTAS table
-    LOG.info("Creating table {} for CTAS with schema: {}, and spec: {}",
-        serDeProperties.get(Catalogs.NAME), tableSchema, 
serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
-    Catalogs.createTable(configuration, createProps);
-
-    // set this in the query state so that we can rollback the table in the 
lifecycle hook in case of failures
-    SessionStateUtil.addResource(configuration, 
InputFormatConfig.CTAS_TABLE_NAME,
-        serDeProperties.getProperty(Catalogs.NAME));
-  }
-
-  private Properties getCTASTableCreationProperties(Properties 
serDeProperties) {
-    Properties tblProps = (Properties) serDeProperties.clone();
-
-    // remove the serialization-only related props
-    tblProps.remove(serdeConstants.LIST_PARTITION_COLUMNS);
-    tblProps.remove(serdeConstants.LIST_PARTITION_COLUMN_TYPES);
-    tblProps.remove(serdeConstants.LIST_PARTITION_COLUMN_COMMENTS);
-
-    tblProps.remove(serdeConstants.LIST_COLUMNS);
-    tblProps.remove(serdeConstants.LIST_COLUMN_TYPES);
-    tblProps.remove(serdeConstants.LIST_COLUMN_COMMENTS);
-
-    tblProps.remove(serdeConstants.COLUMN_NAME_DELIMITER);
-    tblProps.remove(serdeConstants.SERIALIZATION_LIB);
-    tblProps.remove(hive_metastoreConstants.TABLE_IS_CTAS);
-
-    // add the commonly-needed table properties
-    HiveIcebergMetaHook.COMMON_HMS_PROPERTIES.forEach(tblProps::putIfAbsent);
-    tblProps.setProperty(TableProperties.ENGINE_HIVE_ENABLED, "true");
-
-    return tblProps;
-  }
-
   @Override
   public Class<? extends Writable> getSerializedClass() {
     return Container.class;
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 33be9da085f..2241ded4719 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.type.Date;
 import org.apache.hadoop.hive.common.type.SnapshotContext;
 import org.apache.hadoop.hive.common.type.Timestamp;
-import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
@@ -263,7 +262,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   }
 
   @Override
-  public boolean directInsertCTAS() {
+  public boolean directInsert() {
     return true;
   }
 
@@ -462,7 +461,11 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   @Override
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
     String tableName = commitProperties.getProperty(Catalogs.NAME);
+    String location = commitProperties.getProperty(Catalogs.LOCATION);
     Configuration configuration = SessionState.getSessionConf();
+    if (location != null) {
+      HiveTableUtil.cleanupTableObjectFile(location, configuration);
+    }
     List<JobContext> jobContextList = generateJobContext(configuration, 
tableName, overwrite);
     if (jobContextList.isEmpty()) {
       return;
@@ -729,6 +732,11 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
    */
   public static Table table(Configuration config, String name) {
     Table table = 
SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX
 + name));
+    if (table == null &&
+            config.getBoolean(hive_metastoreConstants.TABLE_IS_CTAS, false) &&
+            
StringUtils.isNotBlank(config.get(InputFormatConfig.TABLE_LOCATION))) {
+      table = HiveTableUtil.readTableObjectFromFile(config);
+    }
     checkAndSetIoConfig(config, table);
     return table;
   }
@@ -759,7 +767,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
    */
   public static void checkAndSkipIoConfigSerialization(Configuration config, 
Table table) {
     if (table != null && 
config.getBoolean(InputFormatConfig.CONFIG_SERIALIZATION_DISABLED,
-        InputFormatConfig.CONFIG_SERIALIZATION_DISABLED_DEFAULT) && table.io() 
instanceof HadoopConfigurable) {
+            InputFormatConfig.CONFIG_SERIALIZATION_DISABLED_DEFAULT) && 
table.io() instanceof HadoopConfigurable) {
       ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new 
NonSerializingConfig(config)::get);
     }
   }
@@ -831,16 +839,19 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX + 
tableDesc.getTableName(),
           SerializationUtil.serializeToBase64(serializableTable));
     } catch (NoSuchTableException ex) {
-      if 
(!(StringUtils.isNotBlank(props.getProperty(hive_metastoreConstants.TABLE_IS_CTAS))
 &&
-              
StringUtils.isNotBlank(props.getProperty(Constants.EXPLAIN_CTAS_LOCATION)))) {
+      if (!HiveTableUtil.isCtas(props)) {
         throw ex;
       }
 
-      location = map.get(hive_metastoreConstants.META_TABLE_LOCATION);
-      if (StringUtils.isBlank(location)) {
-        location = props.getProperty(Constants.EXPLAIN_CTAS_LOCATION);
+      if (!Catalogs.hiveCatalog(configuration, props)) {
+        throw new 
UnsupportedOperationException(HiveIcebergSerDe.CTAS_EXCEPTION_MSG);
       }
 
+      location = map.get(hive_metastoreConstants.META_TABLE_LOCATION);
+
+      map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX + 
tableDesc.getTableName(),
+              SerializationUtil.serializeToBase64(null));
+
       try {
         AbstractSerDe serDe = tableDesc.getDeserializer(configuration);
         HiveIcebergSerDe icebergSerDe = (HiveIcebergSerDe) serDe;
@@ -860,6 +871,9 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     // save schema into table props as well to avoid repeatedly hitting the 
HMS during serde initializations
     // this is an exception to the interface documentation, but it's a safe 
operation to add this property
     props.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
+    if (spec == null) {
+      spec = PartitionSpec.unpartitioned();
+    }
     props.put(InputFormatConfig.PARTITION_SPEC, 
PartitionSpecParser.toJson(spec));
 
     // We need to remove this otherwise the job.xml will be invalid as column 
comments are separated with '\0' and
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
index 8c51de4c930..fd45ff488f1 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
@@ -20,6 +20,9 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UncheckedIOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -37,22 +40,37 @@ import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.SerializationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HiveTableUtil {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableUtil.class);
+
+  static final String TABLE_EXTENSION = ".table";
+
   private HiveTableUtil() {
   }
 
@@ -142,4 +160,53 @@ public class HiveTableUtil {
       throw new MetaException("Exception happened during the collection of 
file statuses.\n" + e.getMessage());
     }
   }
+
+  static String generateTableObjectLocation(String tableLocation, 
Configuration conf) {
+    return tableLocation + "/temp/" + 
conf.get(HiveConf.ConfVars.HIVEQUERYID.varname) + TABLE_EXTENSION;
+  }
+
+  static void createFileForTableObject(Table table, Configuration conf) {
+    String filePath = generateTableObjectLocation(table.location(), conf);
+
+    Table serializableTable = SerializableTable.copyOf(table);
+    HiveIcebergStorageHandler.checkAndSkipIoConfigSerialization(conf, 
serializableTable);
+    String serialized = SerializationUtil.serializeToBase64(serializableTable);
+
+    OutputFile serializedTableFile = table.io().newOutputFile(filePath);
+    try (ObjectOutputStream oos = new 
ObjectOutputStream(serializedTableFile.createOrOverwrite())) {
+      oos.writeObject(serialized);
+    } catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+    LOG.debug("Iceberg table metadata file is created {}", 
serializedTableFile);
+  }
+
+  static void cleanupTableObjectFile(String location, Configuration 
configuration) {
+    String tableObjectLocation = 
HiveTableUtil.generateTableObjectLocation(location, configuration);
+    try {
+      Path toDelete = new Path(tableObjectLocation);
+      FileSystem fs = Util.getFs(toDelete, configuration);
+      fs.delete(toDelete, true);
+    } catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+  }
+
+  static Table readTableObjectFromFile(Configuration config) {
+    String location = config.get(InputFormatConfig.TABLE_LOCATION);
+    String filePath = HiveTableUtil.generateTableObjectLocation(location, 
config);
+
+    try (FileIO io = new HadoopFileIO(config)) {
+      try (ObjectInputStream ois = new 
ObjectInputStream(io.newInputFile(filePath).newStream())) {
+        return SerializationUtil.deserializeFromBase64((String) 
ois.readObject());
+      }
+    } catch (ClassNotFoundException | IOException e) {
+      throw new NotFoundException("Can not read or parse table object file: 
%s", filePath);
+    }
+  }
+
+  public static boolean isCtas(Properties properties) {
+    return 
Boolean.parseBoolean(properties.getProperty(hive_metastoreConstants.TABLE_IS_CTAS));
+  }
+
 }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
index 8ecb9e31bd0..8cecbc759ce 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
@@ -92,7 +92,8 @@ public class TestHiveIcebergCTAS extends 
HiveIcebergStorageHandlerWithEngineBase
     org.apache.hadoop.hive.metastore.api.Table hmsTable = 
shell.metastore().getTable("default", "target");
     Assert.assertEquals(3, hmsTable.getSd().getColsSize());
     Assert.assertTrue(hmsTable.getPartitionKeys().isEmpty());
-    Assert.assertEquals(fileFormat.toString(), 
hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
+    Assert.assertEquals(
+            fileFormat.toString().toLowerCase(), 
hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
 
     // check Iceberg table has correct partition spec
     Table table = testTables.loadTable(TableIdentifier.of("default", 
"target"));
@@ -167,7 +168,8 @@ public class TestHiveIcebergCTAS extends 
HiveIcebergStorageHandlerWithEngineBase
     org.apache.hadoop.hive.metastore.api.Table hmsTable = 
shell.metastore().getTable("default", "target");
     Assert.assertEquals(8, hmsTable.getSd().getColsSize());
     Assert.assertTrue(hmsTable.getPartitionKeys().isEmpty());
-    Assert.assertEquals(fileFormat.toString(), 
hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
+    Assert.assertEquals(
+            fileFormat.toString().toLowerCase(), 
hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
 
     // check Iceberg table has correct partition spec
     Table table = testTables.loadTable(TableIdentifier.of("default", 
"target"));
@@ -213,7 +215,7 @@ public class TestHiveIcebergCTAS extends 
HiveIcebergStorageHandlerWithEngineBase
         HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
 
     shell.executeStatement(String.format(
-        "CREATE TABLE target STORED BY ICEBERG STORED AS %s %s AS SELECT * 
FROM source",
+        "CREATE EXTERNAL TABLE target STORED BY ICEBERG STORED AS %s %s AS 
SELECT * FROM source",
         fileFormat, 
testTables.locationForCreateTableSQL(TableIdentifier.of("default", "target"))));
 
     List<Object[]> objects = shell.executeStatement("SELECT * FROM target 
ORDER BY customer_id");
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/ctas_iceberg_orc.q 
b/iceberg/iceberg-handler/src/test/queries/positive/ctas_iceberg_orc.q
new file mode 100644
index 00000000000..0ab773daed3
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/ctas_iceberg_orc.q
@@ -0,0 +1,15 @@
+set hive.explain.user=false;
+
+create table source(a int, b string, c int);
+
+insert into source values (1, 'one', 3);
+insert into source values (1, 'two', 4);
+
+explain
+create external table tbl_ice stored by iceberg stored as orc tblproperties 
('format-version'='2') as
+select a, b, c from source;
+
+create external table tbl_ice stored by iceberg stored as orc tblproperties 
('format-version'='2') as
+select a, b, c from source;
+
+select * from tbl_ice;
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_orc.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_orc.q.out
new file mode 100644
index 00000000000..a110715c9c1
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_orc.q.out
@@ -0,0 +1,174 @@
+PREHOOK: query: create table source(a int, b string, c int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@source
+POSTHOOK: query: create table source(a int, b string, c int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@source
+PREHOOK: query: insert into source values (1, 'one', 3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@source
+POSTHOOK: query: insert into source values (1, 'one', 3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@source
+POSTHOOK: Lineage: source.a SCRIPT []
+POSTHOOK: Lineage: source.b SCRIPT []
+POSTHOOK: Lineage: source.c SCRIPT []
+PREHOOK: query: insert into source values (1, 'two', 4)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@source
+POSTHOOK: query: insert into source values (1, 'two', 4)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@source
+POSTHOOK: Lineage: source.a SCRIPT []
+POSTHOOK: Lineage: source.b SCRIPT []
+POSTHOOK: Lineage: source.c SCRIPT []
+PREHOOK: query: explain
+create external table tbl_ice stored by iceberg stored as orc tblproperties 
('format-version'='2') as
+select a, b, c from source
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@source
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: explain
+create external table tbl_ice stored by iceberg stored as orc tblproperties 
('format-version'='2') as
+select a, b, c from source
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@source
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+  Stage-4 is a root stage
+  Stage-1 depends on stages: Stage-4
+  Stage-2 depends on stages: Stage-1
+  Stage-3 depends on stages: Stage-0, Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-4
+    Create Table
+      columns: a int, b string, c int
+      name: default.tbl_ice
+      input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+      location: hdfs://### HDFS PATH ###
+      output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+      serde properties:
+        write.format.default orc
+      storage handler: org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+      table properties:
+        EXTERNAL TRUE
+        format-version 2
+        storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+      isExternal: true
+
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: source
+                  Statistics: Num rows: 2 Data size: 190 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  Select Operator
+                    expressions: a (type: int), b (type: string), c (type: int)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 2 Data size: 190 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 2 Data size: 190 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      table:
+                          input format: 
org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                          output format: 
org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                          serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                          name: default.tbl_ice
+                    Select Operator
+                      expressions: _col0 (type: int), _col1 (type: string), 
_col2 (type: int)
+                      outputColumnNames: col1, col2, col3
+                      Statistics: Num rows: 2 Data size: 190 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      Group By Operator
+                        aggregations: min(col1), max(col1), count(1), 
count(col1), compute_bit_vector_hll(col1), max(length(col2)), 
avg(COALESCE(length(col2),0)), count(col2), compute_bit_vector_hll(col2), 
min(col3), max(col3), count(col3), compute_bit_vector_hll(col3)
+                        minReductionHashAggr: 0.5
+                        mode: hash
+                        outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
+                        Statistics: Num rows: 1 Data size: 560 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        Reduce Output Operator
+                          null sort order: 
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 560 Basic stats: 
COMPLETE Column stats: COMPLETE
+                          value expressions: _col0 (type: int), _col1 (type: 
int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 
(type: int), _col6 (type: struct<count:bigint,sum:double,input:int>), _col7 
(type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), 
_col11 (type: bigint), _col12 (type: binary)
+            Execution mode: vectorized
+        Reducer 2 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), 
count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), 
max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7), 
compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), 
count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, _col11, _col12
+                Statistics: Num rows: 1 Data size: 492 Basic stats: COMPLETE 
Column stats: COMPLETE
+                Select Operator
+                  expressions: 'LONG' (type: string), UDFToLong(_col0) (type: 
bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint), 
COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 
'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint), 
COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint), 
COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 
'LONG' (type: string), UDFToLong(_col9) (typ [...]
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, 
_col16, _col17
+                  Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 1 Data size: 794 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-3
+    Stats Work
+      Basic Stats Work:
+      Column Stats Desc:
+          Columns: a, b, c
+          Column Types: int, string, int
+          Table: default.tbl_ice
+
+  Stage: Stage-0
+    Move Operator
+      files:
+          hdfs directory: true
+          destination: hdfs://### HDFS PATH ###
+
+PREHOOK: query: create external table tbl_ice stored by iceberg stored as orc 
tblproperties ('format-version'='2') as
+select a, b, c from source
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@source
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: create external table tbl_ice stored by iceberg stored as orc 
tblproperties ('format-version'='2') as
+select a, b, c from source
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@source
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: Lineage: tbl_ice.a SIMPLE [(source)source.FieldSchema(name:a, 
type:int, comment:null), ]
+POSTHOOK: Lineage: tbl_ice.b SIMPLE [(source)source.FieldSchema(name:b, 
type:string, comment:null), ]
+POSTHOOK: Lineage: tbl_ice.c SIMPLE [(source)source.FieldSchema(name:c, 
type:int, comment:null), ]
+PREHOOK: query: select * from tbl_ice
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from tbl_ice
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      one     3
+1      two     4
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
index 99004085770..4828cd85c66 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
@@ -47,12 +47,29 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 OPTIMIZED SQL: SELECT `a`, `b`, `c`
 FROM `default`.`source`
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
+  Stage-4 is a root stage
+  Stage-1 depends on stages: Stage-4
   Stage-2 depends on stages: Stage-1
   Stage-3 depends on stages: Stage-0, Stage-2
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
+  Stage: Stage-4
+    Create Table
+      columns: a int, b string, c int
+      name: default.tbl_ice
+      input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+      location: hdfs://### HDFS PATH ###
+      output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+      serde properties:
+        write.format.default orc
+      storage handler: org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+      table properties:
+        EXTERNAL TRUE
+        format-version 2
+        storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+      isExternal: true
+
   Stage: Stage-1
     Tez
 #### A masked pattern was here ####
@@ -87,9 +104,11 @@ STAGE PLANS:
                             created_with_ctas true
                             format-version 2
                             iceberg.mr.operation.type.default.tbl_ice OTHER
+                            iceberg.mr.serialized.table.default.tbl_ice 
rO0ABXA=
                             iceberg.mr.table.identifier default.tbl_ice
                             iceberg.mr.table.location hdfs://### HDFS PATH ###
                             iceberg.mr.table.schema 
{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"a","required":false,"type":"int"},{"id":2,"name":"b","required":false,"type":"string"},{"id":3,"name":"c","required":false,"type":"int"}]}
+                            location hdfs://### HDFS PATH ###
                             mapred.output.committer.class 
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler$HiveIcebergNoJobCommitter
                             name default.tbl_ice
                             serialization.format 1
@@ -105,6 +124,7 @@ STAGE PLANS:
                             iceberg.mr.operation.type.default.tbl_ice OTHER
                             iceberg.mr.table.partition.spec 
{"spec-id":0,"fields":[{"name":"a_bucket","transform":"bucket[16]","source-id":1,"field-id":1000},{"name":"b_trunc","transform":"truncate[3]","source-id":2,"field-id":1001}]}
                             iceberg.mr.table.schema 
{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"a","required":false,"type":"int"},{"id":2,"name":"b","required":false,"type":"string"},{"id":3,"name":"c","required":false,"type":"int"}]}
+                            location hdfs://### HDFS PATH ###
                             name default.tbl_ice
                             serialization.format 1
                             serialization.lib 
org.apache.iceberg.mr.hive.HiveIcebergSerDe
@@ -246,6 +266,9 @@ POSTHOOK: Input: default@source
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: Lineage: tbl_ice.a SIMPLE [(source)source.FieldSchema(name:a, 
type:int, comment:null), ]
+POSTHOOK: Lineage: tbl_ice.b SIMPLE [(source)source.FieldSchema(name:b, 
type:string, comment:null), ]
+POSTHOOK: Lineage: tbl_ice.c SIMPLE [(source)source.FieldSchema(name:c, 
type:int, comment:null), ]
 PREHOOK: query: describe formatted tbl_ice
 PREHOOK: type: DESCTABLE
 PREHOOK: Input: default@tbl_ice
@@ -265,13 +288,13 @@ b                         TRUNCATE[3]
 # Detailed Table Information            
 Database:              default                  
 #### A masked pattern was here ####
-Retention:             2147483647               
+Retention:             0                        
 #### A masked pattern was here ####
 Table Type:            EXTERNAL_TABLE           
 Table Parameters:               
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}}
        EXTERNAL                TRUE                
-       bucketing_version       -1                  
+       bucketing_version       2                   
        engine.hive.enabled     true                
        format-version          2                   
        iceberg.orc.files.only  true                
@@ -285,7 +308,10 @@ Table Parameters:
        totalSize               812                 
 #### A masked pattern was here ####
        uuid                    #Masked#
+       write.delete.mode       merge-on-read       
        write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
                 
 # Storage Information           
 SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
index 75179f38cac..5703cc3a104 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
@@ -72,8 +72,6 @@ import org.apache.hadoop.mapred.OutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.hive.conf.Constants.EXPLAIN_CTAS_LOCATION;
-
 /**
  * DDL task description for CREATE TABLE commands.
  */
@@ -520,7 +518,6 @@ public class CreateTableDesc implements DDLDesc, 
Serializable {
   @Explain(displayName = "table properties")
   public Map<String, String> getTblPropsExplain() { // only for displaying plan
     return PlanUtils.getPropertiesForExplain(tblProps,
-            EXPLAIN_CTAS_LOCATION,
             hive_metastoreConstants.TABLE_IS_CTAS,
             hive_metastoreConstants.TABLE_BUCKETING_VERSION);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index eab5b4803ff..41396c1e42b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -1069,10 +1069,15 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
     } else if (moveWork.getLoadFileWork() != null) {
       // Get the info from the create table data
       CreateTableDesc createTableDesc = 
moveWork.getLoadFileWork().getCtasCreateTableDesc();
+      String location = null;
       if (createTableDesc != null) {
         storageHandlerClass = createTableDesc.getStorageHandler();
         commitProperties = new Properties();
         commitProperties.put(hive_metastoreConstants.META_TABLE_NAME, 
createTableDesc.getDbTableName());
+        location = createTableDesc.getLocation();
+      }
+      if (location != null) {
+        commitProperties.put(hive_metastoreConstants.META_TABLE_LOCATION, 
location);
       }
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 37d3497e694..b82d6456c83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -244,17 +244,15 @@ public interface HiveStorageHandler extends Configurable {
   }
 
   /**
-   * Check if CTAS operations should behave in a direct-insert manner (i.e. no 
move task).
-   *
-   * If true, the compiler will not include the table creation task and move 
task into the execution plan.
-   * Instead, it's the responsibility of storage handler/serde to create the 
table during the compilation phase.
+   * Check if CTAS and CMV operations should behave in a direct-insert manner 
(i.e. no move task).
+   * <p>
    * Please note that the atomicity of the operation will suffer in this case, 
i.e. the created table might become
-   * exposed, depending on the implementation, before the CTAS operations 
finishes.
+   * exposed, depending on the implementation, before the CTAS or CMV 
operations finishes.
    * Rollback (e.g. dropping the table) is also the responsibility of the 
storage handler in case of failures.
    *
-   * @return whether direct insert CTAS is required
+   * @return whether direct insert CTAS or CMV is required
    */
-  default boolean directInsertCTAS() {
+  default boolean directInsert() {
     return false;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index a6d5719bec3..7631861b6bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.parse;
 import static java.util.Objects.nonNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
-import static org.apache.hadoop.hive.conf.Constants.EXPLAIN_CTAS_LOCATION;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.DYNAMICPARTITIONCONVERT;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEARCHIVEENABLED;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_DEFAULT_STORAGE_HANDLER;
@@ -7750,7 +7749,6 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       try {
         if (tblDesc == null) {
           if (viewDesc != null) {
-            destinationTable = viewDesc.toTable(conf);
             tableDescriptor = PlanUtils.getTableDesc(viewDesc, cols, colTypes);
           } else if (qb.getIsQuery()) {
             Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class;
@@ -7768,28 +7766,16 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
             }
             tableDescriptor = PlanUtils.getDefaultQueryOutputTableDesc(cols, 
colTypes, fileFormat,
                 serdeClass);
-            destinationTable = null;
           } else {
             tableDescriptor = 
PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
           }
         } else {
-          destinationTable = 
db.getTranslateTableDryrun(tblDesc.toTable(conf).getTTable());
-          if (ctx.isExplainPlan() &&
-                  tblDesc.getTblProps().containsKey(TABLE_IS_CTAS) &&
-                  !tblDesc.getTblProps().containsKey(META_TABLE_LOCATION)) {
-            if (destinationTable.getDataLocation() == null) {
-              // no metastore.metadata.transformer.class was set
-              tblDesc.getTblProps().put(EXPLAIN_CTAS_LOCATION, new 
Warehouse(conf).getDefaultTablePath(
-                      destinationTable.getDbName(),
-                      destinationTable.getTableName(),
-                      
Boolean.parseBoolean(destinationTable.getParameters().get("EXTERNAL"))).toString());
-            } else {
-              tblDesc.getTblProps().put(EXPLAIN_CTAS_LOCATION, 
destinationTable.getDataLocation().toString());
-            }
+          if (tblDesc.isCTAS() && tblDesc.getStorageHandler() != null) {
+            tblDesc.setLocation(getCtasOrCMVLocation(tblDesc, viewDesc, 
createTableUseSuffix).toString());
           }
           tableDescriptor = PlanUtils.getTableDesc(tblDesc, cols, colTypes);
         }
-      } catch (HiveException | MetaException e) {
+      } catch (HiveException e) {
         throw new SemanticException(e);
       }
 
@@ -7810,6 +7796,16 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       boolean isDfsDir = (destType == QBMetaData.DEST_DFS_FILE);
 
+      try {
+        if (tblDesc != null) {
+          destinationTable = 
db.getTranslateTableDryrun(tblDesc.toTable(conf).getTTable());
+        } else {
+          destinationTable = viewDesc != null ? viewDesc.toTable(conf) : null;
+        }
+      } catch (HiveException e) {
+        throw new SemanticException(e);
+      }
+
       destTableIsFullAcid = AcidUtils.isFullAcidTable(destinationTable);
 
       // Data organization (DISTRIBUTED, SORTED, CLUSTERED) for materialized 
view
@@ -14041,9 +14037,6 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
           isTransactional, isManaged, new String[]{qualifiedTabName.getDb(), 
qualifiedTabName.getTable()}, isDefaultTableTypeChanged);
       isExt = isExternalTableChanged(tblProps, isTransactional, isExt, 
isDefaultTableTypeChanged);
       tblProps.put(TABLE_IS_CTAS, "true");
-      if (ctx.isExplainPlan()) {
-        tblProps.put(EXPLAIN_CTAS_LOCATION, "");
-      }
       addDbAndTabToOutputs(new String[] {qualifiedTabName.getDb(), 
qualifiedTabName.getTable()},
           TableType.MANAGED_TABLE, isTemporary, tblProps, storageFormat);
       tableDesc = new CreateTableDesc(qualifiedTabName, isExt, isTemporary, 
cols,
@@ -14094,7 +14087,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     WriteType lockType = tblProps != null && 
Boolean.parseBoolean(tblProps.get(TABLE_IS_CTAS))
         && AcidUtils.isExclusiveCTASEnabled(conf)
         // iceberg CTAS has it's own locking mechanism, therefore we should 
exclude them
-        && (t.getStorageHandler() == null || 
!t.getStorageHandler().directInsertCTAS()) ?
+        && (t.getStorageHandler() == null || 
!t.getStorageHandler().directInsert()) ?
       WriteType.CTAS : WriteType.DDL_NO_LOCK;
     
     outputs.add(new WriteEntity(t, lockType));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 193af2d695c..3298156d6ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -136,11 +136,11 @@ public abstract class TaskCompiler {
     boolean isCStats = pCtx.getQueryProperties().isAnalyzeRewrite();
     int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit();
 
-    boolean directInsertCtas = false;
+    boolean directInsert = false;
     if (pCtx.getCreateTable() != null && 
pCtx.getCreateTable().getStorageHandler() != null) {
       try {
-        directInsertCtas =
-            HiveUtils.getStorageHandler(conf, 
pCtx.getCreateTable().getStorageHandler()).directInsertCTAS();
+        directInsert =
+            HiveUtils.getStorageHandler(conf, 
pCtx.getCreateTable().getStorageHandler()).directInsert();
       } catch (HiveException e) {
         throw new SemanticException("Failed to load storage handler:  " + 
e.getMessage());
       }
@@ -303,6 +303,22 @@ public abstract class TaskCompiler {
       setInputFormat(rootTask);
     }
 
+    if (directInsert) {
+      Task<?> crtTask = null;
+      if (pCtx.getCreateTable() != null) {
+        CreateTableDesc crtTblDesc = pCtx.getCreateTable();
+        crtTblDesc.validate(conf);
+        crtTask = TaskFactory.get(new DDLWork(inputs, outputs, crtTblDesc));
+      }
+      if (crtTask != null) {
+        for (Task<?> rootTask : rootTasks) {
+          crtTask.addDependentTask(rootTask);
+          rootTasks.clear();
+          rootTasks.add(crtTask);
+        }
+      }
+    }
+
     optimizeTaskPlan(rootTasks, pCtx, ctx);
 
     /*
@@ -373,14 +389,14 @@ public abstract class TaskCompiler {
 
     // for direct insert CTAS, we don't need this table creation DDL task, 
since the table will be created
     // ahead of time by the non-native table
-    if (pCtx.getQueryProperties().isCTAS() && 
!pCtx.getCreateTable().isMaterialization() && !directInsertCtas) {
+    if (pCtx.getQueryProperties().isCTAS() && 
!pCtx.getCreateTable().isMaterialization() && !directInsert) {
       // generate a DDL task and make it a dependent task of the leaf
       CreateTableDesc crtTblDesc = pCtx.getCreateTable();
       crtTblDesc.validate(conf);
       Task<?> crtTblTask = TaskFactory.get(new DDLWork(inputs, outputs, 
crtTblDesc));
       patchUpAfterCTASorMaterializedView(rootTasks, inputs, outputs, 
crtTblTask,
           CollectionUtils.isEmpty(crtTblDesc.getPartColNames()));
-    } else if (pCtx.getQueryProperties().isMaterializedView()) {
+    } else if (pCtx.getQueryProperties().isMaterializedView() && 
!directInsert) {
       // generate a DDL task and make it a dependent task of the leaf
       CreateMaterializedViewDesc viewDesc = pCtx.getCreateViewDesc();
       Task<?> crtViewTask = TaskFactory.get(new DDLWork(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 80bab489620..947250cd815 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
-import static org.apache.hadoop.hive.conf.Constants.EXPLAIN_CTAS_LOCATION;
 import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
 import static org.apache.hive.common.util.HiveStringUtils.quoteComments;
 
@@ -1222,7 +1221,7 @@ public final class PlanUtils {
     return LazySimpleSerDe.class;
   }
 
-  private static final String[] FILTER_OUT_FROM_EXPLAIN = {TABLE_IS_CTAS, 
EXPLAIN_CTAS_LOCATION};
+  private static final String[] FILTER_OUT_FROM_EXPLAIN = {TABLE_IS_CTAS};
 
   /**
    * Get a Map of table or partition properties to be used in explain extended 
output.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 12366eff289..edc8e6fdeca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -37,8 +37,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.hadoop.hive.conf.Constants.EXPLAIN_CTAS_LOCATION;
-
 /**
  * TableDesc.
  *
@@ -139,15 +137,11 @@ public class TableDesc implements Serializable, Cloneable 
{
     this.jobProperties = jobProperties;
   }
 
+  @Explain(displayName = "jobProperties", explainLevels = { Level.EXTENDED })
   public Map<String, String> getJobProperties() {
     return jobProperties;
   }
 
-  @Explain(displayName = "jobProperties", explainLevels = { Level.EXTENDED })
-  public Map<String, String> getJobPropertiesExplain() {
-    return PlanUtils.getPropertiesForExplain(jobProperties, 
EXPLAIN_CTAS_LOCATION);
-  }
-
   public void setJobSecrets(Map<String, String> jobSecrets) {
     this.jobSecrets = jobSecrets;
   }

Reply via email to