This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 43b1525f96b HIVE-26771: Use DDLTask to created Iceberg table when
running ctas statement (Krisztian Kasa, reviewed by Denys Kuzmenko)
43b1525f96b is described below
commit 43b1525f96bece0bf46dfab8e73d0e1b51093555
Author: Krisztian Kasa <[email protected]>
AuthorDate: Wed Nov 30 15:59:45 2022 +0100
HIVE-26771: Use DDLTask to created Iceberg table when running ctas
statement (Krisztian Kasa, reviewed by Denys Kuzmenko)
Closes #3802
---
.../org/apache/hadoop/hive/conf/Constants.java | 2 -
.../main/java/org/apache/iceberg/mr/Catalogs.java | 2 -
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 16 +-
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 74 +--------
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 30 +++-
.../org/apache/iceberg/mr/hive/HiveTableUtil.java | 67 ++++++++
.../iceberg/mr/hive/TestHiveIcebergCTAS.java | 8 +-
.../src/test/queries/positive/ctas_iceberg_orc.q | 15 ++
.../test/results/positive/ctas_iceberg_orc.q.out | 174 +++++++++++++++++++++
.../positive/ctas_iceberg_partitioned_orc.q.out | 32 +++-
.../hive/ql/ddl/table/create/CreateTableDesc.java | 3 -
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 5 +
.../hive/ql/metadata/HiveStorageHandler.java | 12 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 35 ++---
.../apache/hadoop/hive/ql/parse/TaskCompiler.java | 26 ++-
.../org/apache/hadoop/hive/ql/plan/PlanUtils.java | 3 +-
.../org/apache/hadoop/hive/ql/plan/TableDesc.java | 8 +-
17 files changed, 377 insertions(+), 135 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 0ebb578d10b..d39c671cac4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -105,6 +105,4 @@ public class Constants {
public static final String HTTP_HEADER_REQUEST_TRACK = "X-Request-ID";
public static final String TIME_POSTFIX_REQUEST_TRACK = "_TIME";
-
- public static final String EXPLAIN_CTAS_LOCATION = "explainCtasLocation";
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index d716e2d6148..d77ede333d1 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
-import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -225,7 +224,6 @@ public final class Catalogs {
return new HadoopTables(conf).create(schema, spec, map, location);
}
- @VisibleForTesting
static Optional<Catalog> loadCatalog(Configuration conf, String catalogName)
{
String catalogType = getCatalogType(conf, catalogName);
if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 7ce40fa00a7..53bbb7d149e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -239,11 +239,23 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
setFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
String metadataLocation =
hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+ Table table;
if (metadataLocation != null) {
- Catalogs.registerTable(conf, catalogProperties, metadataLocation);
+ table = Catalogs.registerTable(conf, catalogProperties,
metadataLocation);
} else {
- Catalogs.createTable(conf, catalogProperties);
+ table = Catalogs.createTable(conf, catalogProperties);
}
+
+ if (!HiveTableUtil.isCtas(catalogProperties)) {
+ return;
+ }
+
+ // set this in the query state so that we can rollback the table in the
lifecycle hook in case of failures
+ String tableIdentifier = catalogProperties.getProperty(Catalogs.NAME);
+ SessionStateUtil.addResource(conf, InputFormatConfig.CTAS_TABLE_NAME,
tableIdentifier);
+ SessionStateUtil.addResource(conf, tableIdentifier, table);
+
+ HiveTableUtil.createFileForTableObject(table, conf);
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 6a6aa0884f0..681599d39a2 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -25,17 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
-import org.apache.hadoop.hive.ql.session.SessionStateUtil;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -50,7 +44,6 @@ import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hive.HiveSchemaUtil;
@@ -120,16 +113,11 @@ public class HiveIcebergSerDe extends AbstractSerDe {
this.tableSchema = hiveSchemaOrThrow(e, autoConversion);
// This is only for table creation, it is ok to have an empty
partition column list
this.partitionColumns = ImmutableList.of();
- // create table for CTAS
- if (e instanceof NoSuchTableException &&
-
Boolean.parseBoolean(serDeProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTAS)))
{
- if (!Catalogs.hiveCatalog(configuration, serDeProperties)) {
- throw new SerDeException(CTAS_EXCEPTION_MSG);
- }
- if (!serDeProperties.containsKey(Constants.EXPLAIN_CTAS_LOCATION)) {
- createTableForCTAS(configuration, serDeProperties);
- }
+ if (e instanceof NoSuchTableException &&
+ HiveTableUtil.isCtas(serDeProperties) &&
+ !Catalogs.hiveCatalog(configuration, serDeProperties)) {
+ throw new SerDeException(CTAS_EXCEPTION_MSG);
}
}
}
@@ -182,60 +170,6 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
}
- private void createTableForCTAS(Configuration configuration, Properties
serDeProperties) {
- serDeProperties.setProperty(InputFormatConfig.TABLE_SCHEMA,
SchemaParser.toJson(tableSchema));
-
- // Spec for the PARTITIONED BY SPEC queries (partition stored in the
SessionState)
- PartitionSpec spec = IcebergTableUtil.spec(configuration, tableSchema);
- if (spec == null && !getPartitionColumnNames().isEmpty()) {
- // Spec for the PARTITIONED BY queries (partitioned columns created by
the compiler)
- List<FieldSchema> partitionFields = IntStream.range(0,
getPartitionColumnNames().size())
- .mapToObj(i ->
- new FieldSchema(getPartitionColumnNames().get(i),
getPartitionColumnTypes().get(i).getTypeName(), null))
- .collect(Collectors.toList());
- spec = HiveSchemaUtil.spec(tableSchema, partitionFields);
- }
-
- if (spec != null) {
- serDeProperties.put(InputFormatConfig.PARTITION_SPEC,
PartitionSpecParser.toJson(spec));
- }
-
- // clean up the properties for table creation (so that internal serde
props don't become table props)
- Properties createProps = getCTASTableCreationProperties(serDeProperties);
-
- // create CTAS table
- LOG.info("Creating table {} for CTAS with schema: {}, and spec: {}",
- serDeProperties.get(Catalogs.NAME), tableSchema,
serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
- Catalogs.createTable(configuration, createProps);
-
- // set this in the query state so that we can rollback the table in the
lifecycle hook in case of failures
- SessionStateUtil.addResource(configuration,
InputFormatConfig.CTAS_TABLE_NAME,
- serDeProperties.getProperty(Catalogs.NAME));
- }
-
- private Properties getCTASTableCreationProperties(Properties
serDeProperties) {
- Properties tblProps = (Properties) serDeProperties.clone();
-
- // remove the serialization-only related props
- tblProps.remove(serdeConstants.LIST_PARTITION_COLUMNS);
- tblProps.remove(serdeConstants.LIST_PARTITION_COLUMN_TYPES);
- tblProps.remove(serdeConstants.LIST_PARTITION_COLUMN_COMMENTS);
-
- tblProps.remove(serdeConstants.LIST_COLUMNS);
- tblProps.remove(serdeConstants.LIST_COLUMN_TYPES);
- tblProps.remove(serdeConstants.LIST_COLUMN_COMMENTS);
-
- tblProps.remove(serdeConstants.COLUMN_NAME_DELIMITER);
- tblProps.remove(serdeConstants.SERIALIZATION_LIB);
- tblProps.remove(hive_metastoreConstants.TABLE_IS_CTAS);
-
- // add the commonly-needed table properties
- HiveIcebergMetaHook.COMMON_HMS_PROPERTIES.forEach(tblProps::putIfAbsent);
- tblProps.setProperty(TableProperties.ENGINE_HIVE_ENABLED, "true");
-
- return tblProps;
- }
-
@Override
public Class<? extends Writable> getSerializedClass() {
return Container.class;
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 33be9da085f..2241ded4719 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.SnapshotContext;
import org.apache.hadoop.hive.common.type.Timestamp;
-import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
@@ -263,7 +262,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
@Override
- public boolean directInsertCTAS() {
+ public boolean directInsert() {
return true;
}
@@ -462,7 +461,11 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
@Override
public void storageHandlerCommit(Properties commitProperties, boolean
overwrite) throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
+ String location = commitProperties.getProperty(Catalogs.LOCATION);
Configuration configuration = SessionState.getSessionConf();
+ if (location != null) {
+ HiveTableUtil.cleanupTableObjectFile(location, configuration);
+ }
List<JobContext> jobContextList = generateJobContext(configuration,
tableName, overwrite);
if (jobContextList.isEmpty()) {
return;
@@ -729,6 +732,11 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
*/
public static Table table(Configuration config, String name) {
Table table =
SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX
+ name));
+ if (table == null &&
+ config.getBoolean(hive_metastoreConstants.TABLE_IS_CTAS, false) &&
+
StringUtils.isNotBlank(config.get(InputFormatConfig.TABLE_LOCATION))) {
+ table = HiveTableUtil.readTableObjectFromFile(config);
+ }
checkAndSetIoConfig(config, table);
return table;
}
@@ -759,7 +767,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
*/
public static void checkAndSkipIoConfigSerialization(Configuration config,
Table table) {
if (table != null &&
config.getBoolean(InputFormatConfig.CONFIG_SERIALIZATION_DISABLED,
- InputFormatConfig.CONFIG_SERIALIZATION_DISABLED_DEFAULT) && table.io()
instanceof HadoopConfigurable) {
+ InputFormatConfig.CONFIG_SERIALIZATION_DISABLED_DEFAULT) &&
table.io() instanceof HadoopConfigurable) {
((HadoopConfigurable) table.io()).serializeConfWith(conf -> new
NonSerializingConfig(config)::get);
}
}
@@ -831,16 +839,19 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX +
tableDesc.getTableName(),
SerializationUtil.serializeToBase64(serializableTable));
} catch (NoSuchTableException ex) {
- if
(!(StringUtils.isNotBlank(props.getProperty(hive_metastoreConstants.TABLE_IS_CTAS))
&&
-
StringUtils.isNotBlank(props.getProperty(Constants.EXPLAIN_CTAS_LOCATION)))) {
+ if (!HiveTableUtil.isCtas(props)) {
throw ex;
}
- location = map.get(hive_metastoreConstants.META_TABLE_LOCATION);
- if (StringUtils.isBlank(location)) {
- location = props.getProperty(Constants.EXPLAIN_CTAS_LOCATION);
+ if (!Catalogs.hiveCatalog(configuration, props)) {
+ throw new
UnsupportedOperationException(HiveIcebergSerDe.CTAS_EXCEPTION_MSG);
}
+ location = map.get(hive_metastoreConstants.META_TABLE_LOCATION);
+
+ map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX +
tableDesc.getTableName(),
+ SerializationUtil.serializeToBase64(null));
+
try {
AbstractSerDe serDe = tableDesc.getDeserializer(configuration);
HiveIcebergSerDe icebergSerDe = (HiveIcebergSerDe) serDe;
@@ -860,6 +871,9 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
// save schema into table props as well to avoid repeatedly hitting the
HMS during serde initializations
// this is an exception to the interface documentation, but it's a safe
operation to add this property
props.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
+ if (spec == null) {
+ spec = PartitionSpec.unpartitioned();
+ }
props.put(InputFormatConfig.PARTITION_SPEC,
PartitionSpecParser.toJson(spec));
// We need to remove this otherwise the job.xml will be invalid as column
comments are separated with '\0' and
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
index 8c51de4c930..fd45ff488f1 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
@@ -20,6 +20,9 @@
package org.apache.iceberg.mr.hive;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -37,22 +40,37 @@ import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.SerializationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HiveTableUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveTableUtil.class);
+
+ static final String TABLE_EXTENSION = ".table";
+
private HiveTableUtil() {
}
@@ -142,4 +160,53 @@ public class HiveTableUtil {
throw new MetaException("Exception happened during the collection of
file statuses.\n" + e.getMessage());
}
}
+
+ static String generateTableObjectLocation(String tableLocation,
Configuration conf) {
+ return tableLocation + "/temp/" +
conf.get(HiveConf.ConfVars.HIVEQUERYID.varname) + TABLE_EXTENSION;
+ }
+
+ static void createFileForTableObject(Table table, Configuration conf) {
+ String filePath = generateTableObjectLocation(table.location(), conf);
+
+ Table serializableTable = SerializableTable.copyOf(table);
+ HiveIcebergStorageHandler.checkAndSkipIoConfigSerialization(conf,
serializableTable);
+ String serialized = SerializationUtil.serializeToBase64(serializableTable);
+
+ OutputFile serializedTableFile = table.io().newOutputFile(filePath);
+ try (ObjectOutputStream oos = new
ObjectOutputStream(serializedTableFile.createOrOverwrite())) {
+ oos.writeObject(serialized);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ LOG.debug("Iceberg table metadata file is created {}",
serializedTableFile);
+ }
+
+ static void cleanupTableObjectFile(String location, Configuration
configuration) {
+ String tableObjectLocation =
HiveTableUtil.generateTableObjectLocation(location, configuration);
+ try {
+ Path toDelete = new Path(tableObjectLocation);
+ FileSystem fs = Util.getFs(toDelete, configuration);
+ fs.delete(toDelete, true);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ static Table readTableObjectFromFile(Configuration config) {
+ String location = config.get(InputFormatConfig.TABLE_LOCATION);
+ String filePath = HiveTableUtil.generateTableObjectLocation(location,
config);
+
+ try (FileIO io = new HadoopFileIO(config)) {
+ try (ObjectInputStream ois = new
ObjectInputStream(io.newInputFile(filePath).newStream())) {
+ return SerializationUtil.deserializeFromBase64((String)
ois.readObject());
+ }
+ } catch (ClassNotFoundException | IOException e) {
+ throw new NotFoundException("Can not read or parse table object file:
%s", filePath);
+ }
+ }
+
+ public static boolean isCtas(Properties properties) {
+ return
Boolean.parseBoolean(properties.getProperty(hive_metastoreConstants.TABLE_IS_CTAS));
+ }
+
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
index 8ecb9e31bd0..8cecbc759ce 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
@@ -92,7 +92,8 @@ public class TestHiveIcebergCTAS extends
HiveIcebergStorageHandlerWithEngineBase
org.apache.hadoop.hive.metastore.api.Table hmsTable =
shell.metastore().getTable("default", "target");
Assert.assertEquals(3, hmsTable.getSd().getColsSize());
Assert.assertTrue(hmsTable.getPartitionKeys().isEmpty());
- Assert.assertEquals(fileFormat.toString(),
hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
+ Assert.assertEquals(
+ fileFormat.toString().toLowerCase(),
hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
// check Iceberg table has correct partition spec
Table table = testTables.loadTable(TableIdentifier.of("default",
"target"));
@@ -167,7 +168,8 @@ public class TestHiveIcebergCTAS extends
HiveIcebergStorageHandlerWithEngineBase
org.apache.hadoop.hive.metastore.api.Table hmsTable =
shell.metastore().getTable("default", "target");
Assert.assertEquals(8, hmsTable.getSd().getColsSize());
Assert.assertTrue(hmsTable.getPartitionKeys().isEmpty());
- Assert.assertEquals(fileFormat.toString(),
hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
+ Assert.assertEquals(
+ fileFormat.toString().toLowerCase(),
hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
// check Iceberg table has correct partition spec
Table table = testTables.loadTable(TableIdentifier.of("default",
"target"));
@@ -213,7 +215,7 @@ public class TestHiveIcebergCTAS extends
HiveIcebergStorageHandlerWithEngineBase
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
shell.executeStatement(String.format(
- "CREATE TABLE target STORED BY ICEBERG STORED AS %s %s AS SELECT *
FROM source",
+ "CREATE EXTERNAL TABLE target STORED BY ICEBERG STORED AS %s %s AS
SELECT * FROM source",
fileFormat,
testTables.locationForCreateTableSQL(TableIdentifier.of("default", "target"))));
List<Object[]> objects = shell.executeStatement("SELECT * FROM target
ORDER BY customer_id");
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/ctas_iceberg_orc.q
b/iceberg/iceberg-handler/src/test/queries/positive/ctas_iceberg_orc.q
new file mode 100644
index 00000000000..0ab773daed3
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/ctas_iceberg_orc.q
@@ -0,0 +1,15 @@
+set hive.explain.user=false;
+
+create table source(a int, b string, c int);
+
+insert into source values (1, 'one', 3);
+insert into source values (1, 'two', 4);
+
+explain
+create external table tbl_ice stored by iceberg stored as orc tblproperties
('format-version'='2') as
+select a, b, c from source;
+
+create external table tbl_ice stored by iceberg stored as orc tblproperties
('format-version'='2') as
+select a, b, c from source;
+
+select * from tbl_ice;
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_orc.q.out
new file mode 100644
index 00000000000..a110715c9c1
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_orc.q.out
@@ -0,0 +1,174 @@
+PREHOOK: query: create table source(a int, b string, c int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@source
+POSTHOOK: query: create table source(a int, b string, c int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@source
+PREHOOK: query: insert into source values (1, 'one', 3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@source
+POSTHOOK: query: insert into source values (1, 'one', 3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@source
+POSTHOOK: Lineage: source.a SCRIPT []
+POSTHOOK: Lineage: source.b SCRIPT []
+POSTHOOK: Lineage: source.c SCRIPT []
+PREHOOK: query: insert into source values (1, 'two', 4)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@source
+POSTHOOK: query: insert into source values (1, 'two', 4)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@source
+POSTHOOK: Lineage: source.a SCRIPT []
+POSTHOOK: Lineage: source.b SCRIPT []
+POSTHOOK: Lineage: source.c SCRIPT []
+PREHOOK: query: explain
+create external table tbl_ice stored by iceberg stored as orc tblproperties
('format-version'='2') as
+select a, b, c from source
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@source
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: explain
+create external table tbl_ice stored by iceberg stored as orc tblproperties
('format-version'='2') as
+select a, b, c from source
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@source
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+ Stage-4 is a root stage
+ Stage-1 depends on stages: Stage-4
+ Stage-2 depends on stages: Stage-1
+ Stage-3 depends on stages: Stage-0, Stage-2
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-4
+ Create Table
+ columns: a int, b string, c int
+ name: default.tbl_ice
+ input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ location: hdfs://### HDFS PATH ###
+ output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde properties:
+ write.format.default orc
+ storage handler: org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+ table properties:
+ EXTERNAL TRUE
+ format-version 2
+ storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+ isExternal: true
+
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: source
+ Statistics: Num rows: 2 Data size: 190 Basic stats: COMPLETE
Column stats: COMPLETE
+ Select Operator
+ expressions: a (type: int), b (type: string), c (type: int)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2 Data size: 190 Basic stats:
COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 2 Data size: 190 Basic stats:
COMPLETE Column stats: COMPLETE
+ table:
+ input format:
org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ output format:
org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+ name: default.tbl_ice
+ Select Operator
+ expressions: _col0 (type: int), _col1 (type: string),
_col2 (type: int)
+ outputColumnNames: col1, col2, col3
+ Statistics: Num rows: 2 Data size: 190 Basic stats:
COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: min(col1), max(col1), count(1),
count(col1), compute_bit_vector_hll(col1), max(length(col2)),
avg(COALESCE(length(col2),0)), count(col2), compute_bit_vector_hll(col2),
min(col3), max(col3), count(col3), compute_bit_vector_hll(col3)
+ minReductionHashAggr: 0.5
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
+ Statistics: Num rows: 1 Data size: 560 Basic stats:
COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ null sort order:
+ sort order:
+ Statistics: Num rows: 1 Data size: 560 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col0 (type: int), _col1 (type:
int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5
(type: int), _col6 (type: struct<count:bigint,sum:double,input:int>), _col7
(type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int),
_col11 (type: bigint), _col12 (type: binary)
+ Execution mode: vectorized
+ Reducer 2
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1),
count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4),
max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7),
compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10),
count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5,
_col6, _col7, _col8, _col9, _col10, _col11, _col12
+ Statistics: Num rows: 1 Data size: 492 Basic stats: COMPLETE
Column stats: COMPLETE
+ Select Operator
+ expressions: 'LONG' (type: string), UDFToLong(_col0) (type:
bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint),
COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary),
'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint),
COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint),
COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary),
'LONG' (type: string), UDFToLong(_col9) (typ [...]
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5,
_col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15,
_col16, _col17
+ Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE
Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 794 Basic stats:
COMPLETE Column stats: COMPLETE
+ table:
+ input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-3
+ Stats Work
+ Basic Stats Work:
+ Column Stats Desc:
+ Columns: a, b, c
+ Column Types: int, string, int
+ Table: default.tbl_ice
+
+ Stage: Stage-0
+ Move Operator
+ files:
+ hdfs directory: true
+ destination: hdfs://### HDFS PATH ###
+
+PREHOOK: query: create external table tbl_ice stored by iceberg stored as orc
tblproperties ('format-version'='2') as
+select a, b, c from source
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@source
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: create external table tbl_ice stored by iceberg stored as orc
tblproperties ('format-version'='2') as
+select a, b, c from source
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@source
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: Lineage: tbl_ice.a SIMPLE [(source)source.FieldSchema(name:a,
type:int, comment:null), ]
+POSTHOOK: Lineage: tbl_ice.b SIMPLE [(source)source.FieldSchema(name:b,
type:string, comment:null), ]
+POSTHOOK: Lineage: tbl_ice.c SIMPLE [(source)source.FieldSchema(name:c,
type:int, comment:null), ]
+PREHOOK: query: select * from tbl_ice
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from tbl_ice
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 one 3
+1 two 4
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
index 99004085770..4828cd85c66 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
@@ -47,12 +47,29 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
OPTIMIZED SQL: SELECT `a`, `b`, `c`
FROM `default`.`source`
STAGE DEPENDENCIES:
- Stage-1 is a root stage
+ Stage-4 is a root stage
+ Stage-1 depends on stages: Stage-4
Stage-2 depends on stages: Stage-1
Stage-3 depends on stages: Stage-0, Stage-2
Stage-0 depends on stages: Stage-1
STAGE PLANS:
+ Stage: Stage-4
+ Create Table
+ columns: a int, b string, c int
+ name: default.tbl_ice
+ input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ location: hdfs://### HDFS PATH ###
+ output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde properties:
+ write.format.default orc
+ storage handler: org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+ table properties:
+ EXTERNAL TRUE
+ format-version 2
+ storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+ isExternal: true
+
Stage: Stage-1
Tez
#### A masked pattern was here ####
@@ -87,9 +104,11 @@ STAGE PLANS:
created_with_ctas true
format-version 2
iceberg.mr.operation.type.default.tbl_ice OTHER
+ iceberg.mr.serialized.table.default.tbl_ice
rO0ABXA=
iceberg.mr.table.identifier default.tbl_ice
iceberg.mr.table.location hdfs://### HDFS PATH ###
iceberg.mr.table.schema
{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"a","required":false,"type":"int"},{"id":2,"name":"b","required":false,"type":"string"},{"id":3,"name":"c","required":false,"type":"int"}]}
+ location hdfs://### HDFS PATH ###
mapred.output.committer.class
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler$HiveIcebergNoJobCommitter
name default.tbl_ice
serialization.format 1
@@ -105,6 +124,7 @@ STAGE PLANS:
iceberg.mr.operation.type.default.tbl_ice OTHER
iceberg.mr.table.partition.spec
{"spec-id":0,"fields":[{"name":"a_bucket","transform":"bucket[16]","source-id":1,"field-id":1000},{"name":"b_trunc","transform":"truncate[3]","source-id":2,"field-id":1001}]}
iceberg.mr.table.schema
{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"a","required":false,"type":"int"},{"id":2,"name":"b","required":false,"type":"string"},{"id":3,"name":"c","required":false,"type":"int"}]}
+ location hdfs://### HDFS PATH ###
name default.tbl_ice
serialization.format 1
serialization.lib
org.apache.iceberg.mr.hive.HiveIcebergSerDe
@@ -246,6 +266,9 @@ POSTHOOK: Input: default@source
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: Lineage: tbl_ice.a SIMPLE [(source)source.FieldSchema(name:a,
type:int, comment:null), ]
+POSTHOOK: Lineage: tbl_ice.b SIMPLE [(source)source.FieldSchema(name:b,
type:string, comment:null), ]
+POSTHOOK: Lineage: tbl_ice.c SIMPLE [(source)source.FieldSchema(name:c,
type:int, comment:null), ]
PREHOOK: query: describe formatted tbl_ice
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@tbl_ice
@@ -265,13 +288,13 @@ b TRUNCATE[3]
# Detailed Table Information
Database: default
#### A masked pattern was here ####
-Retention: 2147483647
+Retention: 0
#### A masked pattern was here ####
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}}
EXTERNAL TRUE
- bucketing_version -1
+ bucketing_version 2
engine.hive.enabled true
format-version 2
iceberg.orc.files.only true
@@ -285,7 +308,10 @@ Table Parameters:
totalSize 812
#### A masked pattern was here ####
uuid #Masked#
+ write.delete.mode merge-on-read
write.format.default orc
+ write.merge.mode merge-on-read
+ write.update.mode merge-on-read
# Storage Information
SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
index 75179f38cac..5703cc3a104 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
@@ -72,8 +72,6 @@ import org.apache.hadoop.mapred.OutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.hive.conf.Constants.EXPLAIN_CTAS_LOCATION;
-
/**
* DDL task description for CREATE TABLE commands.
*/
@@ -520,7 +518,6 @@ public class CreateTableDesc implements DDLDesc,
Serializable {
@Explain(displayName = "table properties")
public Map<String, String> getTblPropsExplain() { // only for displaying plan
return PlanUtils.getPropertiesForExplain(tblProps,
- EXPLAIN_CTAS_LOCATION,
hive_metastoreConstants.TABLE_IS_CTAS,
hive_metastoreConstants.TABLE_BUCKETING_VERSION);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index eab5b4803ff..41396c1e42b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -1069,10 +1069,15 @@ public class MoveTask extends Task<MoveWork> implements
Serializable {
} else if (moveWork.getLoadFileWork() != null) {
// Get the info from the create table data
CreateTableDesc createTableDesc =
moveWork.getLoadFileWork().getCtasCreateTableDesc();
+ String location = null;
if (createTableDesc != null) {
storageHandlerClass = createTableDesc.getStorageHandler();
commitProperties = new Properties();
commitProperties.put(hive_metastoreConstants.META_TABLE_NAME,
createTableDesc.getDbTableName());
+ location = createTableDesc.getLocation();
+ }
+ if (location != null) {
+ commitProperties.put(hive_metastoreConstants.META_TABLE_LOCATION,
location);
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 37d3497e694..b82d6456c83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -244,17 +244,15 @@ public interface HiveStorageHandler extends Configurable {
}
/**
- * Check if CTAS operations should behave in a direct-insert manner (i.e. no
move task).
- *
- * If true, the compiler will not include the table creation task and move
task into the execution plan.
- * Instead, it's the responsibility of storage handler/serde to create the
table during the compilation phase.
+ * Check if CTAS and CMV operations should behave in a direct-insert manner
(i.e. no move task).
+ * <p>
* Please note that the atomicity of the operation will suffer in this case,
i.e. the created table might become
- * exposed, depending on the implementation, before the CTAS operations
finishes.
+ * exposed, depending on the implementation, before the CTAS or CMV
operations finishes.
* Rollback (e.g. dropping the table) is also the responsibility of the
storage handler in case of failures.
*
- * @return whether direct insert CTAS is required
+ * @return whether direct insert CTAS or CMV is required
*/
- default boolean directInsertCTAS() {
+ default boolean directInsert() {
return false;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index a6d5719bec3..7631861b6bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.parse;
import static java.util.Objects.nonNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
-import static org.apache.hadoop.hive.conf.Constants.EXPLAIN_CTAS_LOCATION;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.DYNAMICPARTITIONCONVERT;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEARCHIVEENABLED;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_DEFAULT_STORAGE_HANDLER;
@@ -7750,7 +7749,6 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
try {
if (tblDesc == null) {
if (viewDesc != null) {
- destinationTable = viewDesc.toTable(conf);
tableDescriptor = PlanUtils.getTableDesc(viewDesc, cols, colTypes);
} else if (qb.getIsQuery()) {
Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class;
@@ -7768,28 +7766,16 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
}
tableDescriptor = PlanUtils.getDefaultQueryOutputTableDesc(cols,
colTypes, fileFormat,
serdeClass);
- destinationTable = null;
} else {
tableDescriptor =
PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
}
} else {
- destinationTable =
db.getTranslateTableDryrun(tblDesc.toTable(conf).getTTable());
- if (ctx.isExplainPlan() &&
- tblDesc.getTblProps().containsKey(TABLE_IS_CTAS) &&
- !tblDesc.getTblProps().containsKey(META_TABLE_LOCATION)) {
- if (destinationTable.getDataLocation() == null) {
- // no metastore.metadata.transformer.class was set
- tblDesc.getTblProps().put(EXPLAIN_CTAS_LOCATION, new
Warehouse(conf).getDefaultTablePath(
- destinationTable.getDbName(),
- destinationTable.getTableName(),
-
Boolean.parseBoolean(destinationTable.getParameters().get("EXTERNAL"))).toString());
- } else {
- tblDesc.getTblProps().put(EXPLAIN_CTAS_LOCATION,
destinationTable.getDataLocation().toString());
- }
+ if (tblDesc.isCTAS() && tblDesc.getStorageHandler() != null) {
+ tblDesc.setLocation(getCtasOrCMVLocation(tblDesc, viewDesc,
createTableUseSuffix).toString());
}
tableDescriptor = PlanUtils.getTableDesc(tblDesc, cols, colTypes);
}
- } catch (HiveException | MetaException e) {
+ } catch (HiveException e) {
throw new SemanticException(e);
}
@@ -7810,6 +7796,16 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
boolean isDfsDir = (destType == QBMetaData.DEST_DFS_FILE);
+ try {
+ if (tblDesc != null) {
+ destinationTable =
db.getTranslateTableDryrun(tblDesc.toTable(conf).getTTable());
+ } else {
+ destinationTable = viewDesc != null ? viewDesc.toTable(conf) : null;
+ }
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+
destTableIsFullAcid = AcidUtils.isFullAcidTable(destinationTable);
// Data organization (DISTRIBUTED, SORTED, CLUSTERED) for materialized
view
@@ -14041,9 +14037,6 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
isTransactional, isManaged, new String[]{qualifiedTabName.getDb(),
qualifiedTabName.getTable()}, isDefaultTableTypeChanged);
isExt = isExternalTableChanged(tblProps, isTransactional, isExt,
isDefaultTableTypeChanged);
tblProps.put(TABLE_IS_CTAS, "true");
- if (ctx.isExplainPlan()) {
- tblProps.put(EXPLAIN_CTAS_LOCATION, "");
- }
addDbAndTabToOutputs(new String[] {qualifiedTabName.getDb(),
qualifiedTabName.getTable()},
TableType.MANAGED_TABLE, isTemporary, tblProps, storageFormat);
tableDesc = new CreateTableDesc(qualifiedTabName, isExt, isTemporary,
cols,
@@ -14094,7 +14087,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
WriteType lockType = tblProps != null &&
Boolean.parseBoolean(tblProps.get(TABLE_IS_CTAS))
&& AcidUtils.isExclusiveCTASEnabled(conf)
// iceberg CTAS has it's own locking mechanism, therefore we should
exclude them
- && (t.getStorageHandler() == null ||
!t.getStorageHandler().directInsertCTAS()) ?
+ && (t.getStorageHandler() == null ||
!t.getStorageHandler().directInsert()) ?
WriteType.CTAS : WriteType.DDL_NO_LOCK;
outputs.add(new WriteEntity(t, lockType));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 193af2d695c..3298156d6ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -136,11 +136,11 @@ public abstract class TaskCompiler {
boolean isCStats = pCtx.getQueryProperties().isAnalyzeRewrite();
int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit();
- boolean directInsertCtas = false;
+ boolean directInsert = false;
if (pCtx.getCreateTable() != null &&
pCtx.getCreateTable().getStorageHandler() != null) {
try {
- directInsertCtas =
- HiveUtils.getStorageHandler(conf,
pCtx.getCreateTable().getStorageHandler()).directInsertCTAS();
+ directInsert =
+ HiveUtils.getStorageHandler(conf,
pCtx.getCreateTable().getStorageHandler()).directInsert();
} catch (HiveException e) {
throw new SemanticException("Failed to load storage handler: " +
e.getMessage());
}
@@ -303,6 +303,22 @@ public abstract class TaskCompiler {
setInputFormat(rootTask);
}
+ if (directInsert) {
+ Task<?> crtTask = null;
+ if (pCtx.getCreateTable() != null) {
+ CreateTableDesc crtTblDesc = pCtx.getCreateTable();
+ crtTblDesc.validate(conf);
+ crtTask = TaskFactory.get(new DDLWork(inputs, outputs, crtTblDesc));
+ }
+ if (crtTask != null) {
+ for (Task<?> rootTask : rootTasks) {
+ crtTask.addDependentTask(rootTask);
+ rootTasks.clear();
+ rootTasks.add(crtTask);
+ }
+ }
+ }
+
optimizeTaskPlan(rootTasks, pCtx, ctx);
/*
@@ -373,14 +389,14 @@ public abstract class TaskCompiler {
// for direct insert CTAS, we don't need this table creation DDL task,
since the table will be created
// ahead of time by the non-native table
- if (pCtx.getQueryProperties().isCTAS() &&
!pCtx.getCreateTable().isMaterialization() && !directInsertCtas) {
+ if (pCtx.getQueryProperties().isCTAS() &&
!pCtx.getCreateTable().isMaterialization() && !directInsert) {
// generate a DDL task and make it a dependent task of the leaf
CreateTableDesc crtTblDesc = pCtx.getCreateTable();
crtTblDesc.validate(conf);
Task<?> crtTblTask = TaskFactory.get(new DDLWork(inputs, outputs,
crtTblDesc));
patchUpAfterCTASorMaterializedView(rootTasks, inputs, outputs,
crtTblTask,
CollectionUtils.isEmpty(crtTblDesc.getPartColNames()));
- } else if (pCtx.getQueryProperties().isMaterializedView()) {
+ } else if (pCtx.getQueryProperties().isMaterializedView() &&
!directInsert) {
// generate a DDL task and make it a dependent task of the leaf
CreateMaterializedViewDesc viewDesc = pCtx.getCreateViewDesc();
Task<?> crtViewTask = TaskFactory.get(new DDLWork(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 80bab489620..947250cd815 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.plan;
-import static org.apache.hadoop.hive.conf.Constants.EXPLAIN_CTAS_LOCATION;
import static
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
import static org.apache.hive.common.util.HiveStringUtils.quoteComments;
@@ -1222,7 +1221,7 @@ public final class PlanUtils {
return LazySimpleSerDe.class;
}
- private static final String[] FILTER_OUT_FROM_EXPLAIN = {TABLE_IS_CTAS,
EXPLAIN_CTAS_LOCATION};
+ private static final String[] FILTER_OUT_FROM_EXPLAIN = {TABLE_IS_CTAS};
/**
* Get a Map of table or partition properties to be used in explain extended
output.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 12366eff289..edc8e6fdeca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -37,8 +37,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
-import static org.apache.hadoop.hive.conf.Constants.EXPLAIN_CTAS_LOCATION;
-
/**
* TableDesc.
*
@@ -139,15 +137,11 @@ public class TableDesc implements Serializable, Cloneable
{
this.jobProperties = jobProperties;
}
+ @Explain(displayName = "jobProperties", explainLevels = { Level.EXTENDED })
public Map<String, String> getJobProperties() {
return jobProperties;
}
- @Explain(displayName = "jobProperties", explainLevels = { Level.EXTENDED })
- public Map<String, String> getJobPropertiesExplain() {
- return PlanUtils.getPropertiesForExplain(jobProperties,
EXPLAIN_CTAS_LOCATION);
- }
-
public void setJobSecrets(Map<String, String> jobSecrets) {
this.jobSecrets = jobSecrets;
}