This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 79f2a220c3 HIVE-26264: Iceberg integration: Fetch virtual columns on
demand (Krisztian Kasa, reviewed by Peter Vary)
79f2a220c3 is described below
commit 79f2a220c3a0e5303ed89c2054ab277a83381448
Author: Krisztian Kasa <[email protected]>
AuthorDate: Fri Jun 3 06:13:27 2022 +0200
HIVE-26264: Iceberg integration: Fetch virtual columns on demand (Krisztian
Kasa, reviewed by Peter Vary)
---
.../org/apache/iceberg/mr/InputFormatConfig.java | 5 ++
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 4 ++
.../mr/hive/HiveIcebergOutputCommitter.java | 3 +-
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 3 +-
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 3 +-
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 16 ++----
.../apache/iceberg/mr/hive/IcebergAcidUtil.java | 62 +++++++++++++++++++++-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 50 ++++++-----------
.../queries/positive/query_iceberg_virtualcol.q | 9 ++++
.../llap/vectorized_iceberg_read_mixed.q.out | 58 +++++++++++---------
.../llap/vectorized_iceberg_read_orc.q.out | 58 +++++++++++---------
.../llap/vectorized_iceberg_read_parquet.q.out | 58 +++++++++++---------
.../positive/query_iceberg_virtualcol.q.out | 54 +++++++++++++++++++
.../positive/vectorized_iceberg_read_mixed.q.out | 30 ++++++-----
.../positive/vectorized_iceberg_read_orc.q.out | 30 ++++++-----
.../positive/vectorized_iceberg_read_parquet.q.out | 30 ++++++-----
.../org/apache/hadoop/hive/ql/exec/FetchTask.java | 4 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 2 +
.../apache/hadoop/hive/ql/exec/MapOperator.java | 4 +-
.../hadoop/hive/ql/exec/SMBMapJoinOperator.java | 4 +-
.../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 4 +-
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 19 +++++--
.../hive/ql/io/parquet/ProjectionPusher.java | 2 +-
.../hive/ql/metadata/HiveStorageHandler.java | 3 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 32 ++++++-----
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 3 --
.../apache/hadoop/hive/ql/plan/FileSinkDesc.java | 12 ++++-
.../HiveCustomStorageHandlerUtils.java | 23 ++++++++
.../hadoop/hive/serde2/ColumnProjectionUtils.java | 4 +-
29 files changed, 393 insertions(+), 196 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 4bcddf3ef7..b7a02ed365 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -77,6 +77,7 @@ public class InputFormatConfig {
public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name";
public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
+ public static final String FETCH_VIRTUAL_COLUMNS =
"iceberg.mr.fetch.virtual.columns";
public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";
public static final String CONFIG_SERIALIZATION_DISABLED =
"iceberg.mr.config.serialization.disabled";
@@ -235,6 +236,10 @@ public class InputFormatConfig {
return readColumns.split(conf.get(serdeConstants.COLUMN_NAME_DELIMITER,
String.valueOf(SerDeUtils.COMMA)));
}
+ public static boolean fetchVirtualColumns(Configuration conf) {
+ return conf.getBoolean(InputFormatConfig.FETCH_VIRTUAL_COLUMNS, false);
+ }
+
/**
* Get Hadoop config key of a catalog property based on catalog name
* @param catalogName catalog name
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 38c75fad94..23d76d4260 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -134,6 +134,8 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
}
job.set(InputFormatConfig.SELECTED_COLUMNS,
job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
+ job.setBoolean(InputFormatConfig.FETCH_VIRTUAL_COLUMNS,
+
job.getBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, false));
job.set(InputFormatConfig.AS_OF_TIMESTAMP,
job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1"));
job.set(InputFormatConfig.SNAPSHOT_ID,
job.get(TableScanDesc.AS_OF_VERSION, "-1"));
@@ -147,6 +149,8 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
public RecordReader<Void, Container<Record>> getRecordReader(InputSplit
split, JobConf job,
Reporter
reporter) throws IOException {
job.set(InputFormatConfig.SELECTED_COLUMNS,
job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
+ job.setBoolean(InputFormatConfig.FETCH_VIRTUAL_COLUMNS,
+
job.getBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, false));
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
&& Utilities.getIsVectorized(job)) {
Preconditions.checkArgument(MetastoreUtil.hive3PresentOnClasspath(),
"Vectorization only supported for Hive 3+");
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 825ee3dc39..52507cf65c 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
@@ -338,7 +339,7 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
if (writeResults.isEmpty()) {
LOG.info(
"Not creating a new commit for table: {}, jobID: {}, operation:
{}, since there were no new files to add",
- table, jobContext.getJobID(),
HiveIcebergStorageHandler.operation(conf, name));
+ table, jobContext.getJobID(),
HiveCustomStorageHandlerUtils.getWriteOperation(conf, name));
} else {
commitWrite(table, startTime, writeResults);
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 7bf1b1ec0d..00fd9db2f0 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
@@ -72,7 +73,7 @@ public class HiveIcebergOutputFormat<T> implements
OutputFormat<NullWritable, Co
.tableName(tableName)
.attemptID(taskAttemptID)
.poolSize(poolSize)
- .operation(HiveIcebergStorageHandler.operation(jc, tableName))
+ .operation(HiveCustomStorageHandlerUtils.getWriteOperation(jc,
tableName))
.build();
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 99820583a1..4c9eeb8f8d 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
+import
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -145,7 +146,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
private static Schema projectedSchema(Configuration configuration, String
tableName, Schema tableSchema) {
- Context.Operation operation =
HiveIcebergStorageHandler.operation(configuration, tableName);
+ Context.Operation operation =
HiveCustomStorageHandlerUtils.getWriteOperation(configuration, tableName);
if (operation != null) {
switch (operation) {
case DELETE:
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index c79d344272..a6b5c6deba 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -353,11 +353,12 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
@Override
- public DynamicPartitionCtx createDPContext(HiveConf hiveConf,
org.apache.hadoop.hive.ql.metadata.Table hmsTable)
+ public DynamicPartitionCtx createDPContext(
+ HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table
hmsTable, Operation writeOperation)
throws SemanticException {
// delete records are already clustered by partition spec id and the hash
of the partition struct
// there is no need to do any additional sorting based on partition columns
- if (getOperationType().equals(Operation.DELETE.name())) {
+ if (writeOperation == Operation.DELETE) {
return null;
}
@@ -382,7 +383,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
fieldOrderMap.put(fields.get(i).name(), i);
}
- int offset = acidSelectColumns(hmsTable,
Operation.valueOf(getOperationType())).size();
+ int offset = acidSelectColumns(hmsTable, writeOperation).size();
for (PartitionTransformSpec spec : partitionTransformSpecs) {
int order = fieldOrderMap.get(spec.getColumnName());
if
(PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
@@ -573,15 +574,6 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
return false;
}
- public static Operation operation(Configuration conf, String tableName) {
- if (conf == null || tableName == null) {
- return null;
- }
-
- String operation = conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX +
tableName);
- return operation == null ? null : Operation.valueOf(operation);
- }
-
/**
* Returns the Table serialized to the configuration based on the table name.
* If configuration is missing from the FileIO of the table, it will be
populated with the input config.
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
index 73fe9743e7..6b4bca01e2 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
@@ -19,15 +19,21 @@
package org.apache.iceberg.mr.hive;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
@@ -40,12 +46,18 @@ public class IcebergAcidUtil {
private static final Types.NestedField PARTITION_STRUCT_META_COL = null; //
placeholder value in the map
private static final Map<Types.NestedField, Integer> FILE_READ_META_COLS =
Maps.newLinkedHashMap();
+ private static final Map<String, Types.NestedField>
VIRTUAL_COLS_TO_META_COLS = Maps.newLinkedHashMap();
static {
FILE_READ_META_COLS.put(MetadataColumns.SPEC_ID, 0);
FILE_READ_META_COLS.put(PARTITION_STRUCT_META_COL, 1);
FILE_READ_META_COLS.put(MetadataColumns.FILE_PATH, 2);
FILE_READ_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
+
+ VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.PARTITION_SPEC_ID.getName(),
MetadataColumns.SPEC_ID);
+ VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.PARTITION_HASH.getName(),
PARTITION_STRUCT_META_COL);
+ VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.FILE_PATH.getName(),
MetadataColumns.FILE_PATH);
+ VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.ROW_POSITION.getName(),
MetadataColumns.ROW_POSITION);
}
private static final Types.NestedField PARTITION_HASH_META_COL =
Types.NestedField.required(
@@ -62,9 +74,9 @@ public class IcebergAcidUtil {
/**
* @param dataCols The columns of the original file read schema
* @param table The table object - it is used for populating the partition
struct meta column
- * @return The schema for reading files, extended with metadata columns
needed for deletes
+ * @return The schema for reading files, extended with metadata columns
*/
- public static Schema createFileReadSchemaForDelete(List<Types.NestedField>
dataCols, Table table) {
+ public static Schema
createFileReadSchemaWithVirtualColums(List<Types.NestedField> dataCols, Table
table) {
List<Types.NestedField> cols =
Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());
FILE_READ_META_COLS.forEach((metaCol, index) -> {
if (metaCol == PARTITION_STRUCT_META_COL) {
@@ -201,4 +213,50 @@ public class IcebergAcidUtil {
}
return partHash;
}
+
+ public static void copyFields(GenericRecord source, int start, int len,
GenericRecord target) {
+ for (int sourceIdx = start, targetIdx = 0; targetIdx < len; ++sourceIdx,
++targetIdx) {
+ target.set(targetIdx, source.get(sourceIdx));
+ }
+ }
+
+ public static class VirtualColumnAwareIterator<T> implements
CloseableIterator<T> {
+
+ private final CloseableIterator<T> currentIterator;
+
+ private GenericRecord current;
+ private final Schema expectedSchema;
+ private final Configuration conf;
+
+ public VirtualColumnAwareIterator(CloseableIterator<T> currentIterator,
Schema expectedSchema, Configuration conf) {
+ this.currentIterator = currentIterator;
+ this.expectedSchema = expectedSchema;
+ this.conf = conf;
+ current = GenericRecord.create(
+ new Schema(expectedSchema.columns().subList(4,
expectedSchema.columns().size())));
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentIterator.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentIterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ T next = currentIterator.next();
+ GenericRecord rec = (GenericRecord) next;
+ PositionDeleteInfo.setIntoConf(conf,
+ IcebergAcidUtil.parseSpecId(rec),
+ IcebergAcidUtil.computePartitionHash(rec),
+ IcebergAcidUtil.parseFilePath(rec),
+ IcebergAcidUtil.parseFilePosition(rec));
+ IcebergAcidUtil.copyFields(rec, FILE_READ_META_COLS.size(),
current.size(), current);
+ return (T) current;
+ }
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 0ccb750fda..7bbd03a09e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -32,8 +32,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
-import org.apache.hadoop.hive.ql.Context.Operation;
-import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -61,7 +59,6 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.GenericDeleteFilter;
-import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.avro.DataReader;
@@ -241,7 +238,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
private T current;
private CloseableIterator<T> currentIterator;
private Table table;
- private boolean updateOrDelete;
+ private boolean fetchVirtualColumns;
@Override
public void initialize(InputSplit split, TaskAttemptContext newContext) {
@@ -258,9 +255,16 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
this.reuseContainers =
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
this.inMemoryDataModel =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
- this.currentIterator = open(tasks.next(), expectedSchema).iterator();
- Operation operation = HiveIcebergStorageHandler.operation(conf,
conf.get(Catalogs.NAME));
- this.updateOrDelete = Operation.DELETE.equals(operation) ||
Operation.UPDATE.equals(operation);
+ this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
+ this.currentIterator = nextTask();
+ }
+
+ private CloseableIterator<T> nextTask() {
+ CloseableIterator<T> closeableIterator = open(tasks.next(),
expectedSchema).iterator();
+ if (!fetchVirtualColumns) {
+ return closeableIterator;
+ }
+ return new
IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator,
expectedSchema, conf);
}
@Override
@@ -268,18 +272,10 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
while (true) {
if (currentIterator.hasNext()) {
current = currentIterator.next();
- if (updateOrDelete) {
- GenericRecord rec = (GenericRecord) current;
- PositionDeleteInfo.setIntoConf(conf,
- IcebergAcidUtil.parseSpecId(rec),
- IcebergAcidUtil.computePartitionHash(rec),
- IcebergAcidUtil.parseFilePath(rec),
- IcebergAcidUtil.parseFilePosition(rec));
- }
return true;
} else if (tasks.hasNext()) {
currentIterator.close();
- currentIterator = open(tasks.next(), expectedSchema).iterator();
+ this.currentIterator = nextTask();
} else {
currentIterator.close();
return false;
@@ -511,24 +507,11 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
readSchema = caseSensitive ? table.schema().select(selectedColumns) :
table.schema().caseInsensitiveSelect(selectedColumns);
- Operation operation = HiveIcebergStorageHandler.operation(conf,
conf.get(Catalogs.NAME));
- if (operation != null) {
- switch (operation) {
- case DELETE:
- // for DELETE queries, add additional metadata columns into the
read schema
- return
IcebergAcidUtil.createFileReadSchemaForDelete(readSchema.columns(), table);
- case UPDATE:
- // for UPDATE queries, add additional metadata columns into the
read schema
- return
IcebergAcidUtil.createFileReadSchemaForUpdate(readSchema.columns(), table);
- case OTHER:
- // for INSERT queries no extra columns are needed
- return readSchema;
- default:
- throw new IllegalArgumentException("Not supported operation " +
operation);
- }
- } else {
- return readSchema;
+ if (InputFormatConfig.fetchVirtualColumns(conf)) {
+ return
IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(),
table);
}
+
+ return readSchema;
}
private static Schema schemaWithoutConstantsAndMeta(Schema readSchema,
Map<Integer, ?> idToConstant) {
@@ -548,5 +531,4 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
return TypeUtil.selectNot(readSchema, collect);
}
}
-
}
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_virtualcol.q
b/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_virtualcol.q
new file mode 100644
index 0000000000..bf27fa291b
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_virtualcol.q
@@ -0,0 +1,9 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string) partitioned by (c int) stored
by iceberg stored as orc tblproperties ('format-version'='2');
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 50), (3, 'three', 50),
(4, 'four', 52), (5, 'five', 54), (111, 'one', 52), (333, 'two', 56);
+
+select a, c, tbl_ice.PARTITION__SPEC__ID, tbl_ice.PARTITION__HASH,
tbl_ice.ROW__POSITION from tbl_ice
+order by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc;
+
+select a, c, tbl_ice.PARTITION__SPEC__ID, tbl_ice.PARTITION__HASH,
tbl_ice.ROW__POSITION from tbl_ice
+sort by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc;
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out
index b3d9d7f5e8..e173ea1c84 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out
@@ -69,20 +69,24 @@ STAGE PLANS:
TableScan
alias: tbl_ice_mixed
Statistics: Num rows: 19 Data size: 1748 Basic stats:
COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: max(a)
- keys: b (type: string)
- minReductionHashAggr: 0.4736842
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 10 Data size: 920 Basic stats:
COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- null sort order: z
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Select Operator
+ expressions: a (type: int), b (type: string)
+ outputColumnNames: a, b
+ Statistics: Num rows: 19 Data size: 1748 Basic stats:
COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: max(a)
+ keys: b (type: string)
+ minReductionHashAggr: 0.4736842
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 10 Data size: 920 Basic stats:
COMPLETE Column stats: COMPLETE
- value expressions: _col1 (type: int)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 10 Data size: 920 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col1 (type: int)
Execution mode: vectorized, llap
LLAP IO: all inputs (cache only)
Reducer 2
@@ -207,20 +211,24 @@ STAGE PLANS:
TableScan
alias: tbl_ice_mixed_all_types
Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE
Column stats: COMPLETE
- Group By Operator
- aggregations: max(t_float)
- keys: t_double (type: double), t_boolean (type: boolean),
t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), t_string
(type: string), t_timestamp (type: timestamp), t_date (type: date), t_decimal
(type: decimal(4,2))
- minReductionHashAggr: 0.99
- mode: hash
- outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5, _col6, _col7, _col8, _col9
+ Select Operator
+ expressions: t_float (type: float), t_double (type:
double), t_boolean (type: boolean), t_int (type: int), t_bigint (type: bigint),
t_binary (type: binary), t_string (type: string), t_timestamp (type:
timestamp), t_date (type: date), t_decimal (type: decimal(4,2))
+ outputColumnNames: t_float, t_double, t_boolean, t_int,
t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
Statistics: Num rows: 2 Data size: 746 Basic stats:
COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: double), _col1 (type:
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type:
decimal(4,2))
- null sort order: zzzzzzzzz
- sort order: +++++++++
- Map-reduce partition columns: _col0 (type: double),
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type:
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date),
_col8 (type: decimal(4,2))
+ Group By Operator
+ aggregations: max(t_float)
+ keys: t_double (type: double), t_boolean (type:
boolean), t_int (type: int), t_bigint (type: bigint), t_binary (type: binary),
t_string (type: string), t_timestamp (type: timestamp), t_date (type: date),
t_decimal (type: decimal(4,2))
+ minReductionHashAggr: 0.99
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5, _col6, _col7, _col8, _col9
Statistics: Num rows: 2 Data size: 746 Basic stats:
COMPLETE Column stats: COMPLETE
- value expressions: _col9 (type: float)
+ Reduce Output Operator
+ key expressions: _col0 (type: double), _col1 (type:
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type:
decimal(4,2))
+ null sort order: zzzzzzzzz
+ sort order: +++++++++
+ Map-reduce partition columns: _col0 (type: double),
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type:
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date),
_col8 (type: decimal(4,2))
+ Statistics: Num rows: 2 Data size: 746 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col9 (type: float)
Execution mode: vectorized, llap
LLAP IO: all inputs (cache only)
Reducer 2
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_orc.q.out
index f1207fbec1..be5c780551 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_orc.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_orc.q.out
@@ -53,20 +53,24 @@ STAGE PLANS:
TableScan
alias: tbl_ice_orc
Statistics: Num rows: 10 Data size: 920 Basic stats:
COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: max(a)
- keys: b (type: string)
- minReductionHashAggr: 0.5
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 5 Data size: 460 Basic stats:
COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- null sort order: z
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Select Operator
+ expressions: a (type: int), b (type: string)
+ outputColumnNames: a, b
+ Statistics: Num rows: 10 Data size: 920 Basic stats:
COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: max(a)
+ keys: b (type: string)
+ minReductionHashAggr: 0.5
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 5 Data size: 460 Basic stats:
COMPLETE Column stats: COMPLETE
- value expressions: _col1 (type: int)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 5 Data size: 460 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col1 (type: int)
Execution mode: vectorized, llap
LLAP IO: all inputs (cache only)
Reducer 2
@@ -170,20 +174,24 @@ STAGE PLANS:
TableScan
alias: tbl_ice_orc_all_types
Statistics: Num rows: 1 Data size: 372 Basic stats: COMPLETE
Column stats: COMPLETE
- Group By Operator
- aggregations: max(t_float)
- keys: t_double (type: double), t_boolean (type: boolean),
t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), t_string
(type: string), t_timestamp (type: timestamp), t_date (type: date), t_decimal
(type: decimal(4,2))
- minReductionHashAggr: 0.99
- mode: hash
- outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5, _col6, _col7, _col8, _col9
+ Select Operator
+ expressions: t_float (type: float), t_double (type:
double), t_boolean (type: boolean), t_int (type: int), t_bigint (type: bigint),
t_binary (type: binary), t_string (type: string), t_timestamp (type:
timestamp), t_date (type: date), t_decimal (type: decimal(4,2))
+ outputColumnNames: t_float, t_double, t_boolean, t_int,
t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
Statistics: Num rows: 1 Data size: 372 Basic stats:
COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: double), _col1 (type:
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type:
decimal(4,2))
- null sort order: zzzzzzzzz
- sort order: +++++++++
- Map-reduce partition columns: _col0 (type: double),
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type:
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date),
_col8 (type: decimal(4,2))
+ Group By Operator
+ aggregations: max(t_float)
+ keys: t_double (type: double), t_boolean (type:
boolean), t_int (type: int), t_bigint (type: bigint), t_binary (type: binary),
t_string (type: string), t_timestamp (type: timestamp), t_date (type: date),
t_decimal (type: decimal(4,2))
+ minReductionHashAggr: 0.99
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5, _col6, _col7, _col8, _col9
Statistics: Num rows: 1 Data size: 372 Basic stats:
COMPLETE Column stats: COMPLETE
- value expressions: _col9 (type: float)
+ Reduce Output Operator
+ key expressions: _col0 (type: double), _col1 (type:
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type:
decimal(4,2))
+ null sort order: zzzzzzzzz
+ sort order: +++++++++
+ Map-reduce partition columns: _col0 (type: double),
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type:
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date),
_col8 (type: decimal(4,2))
+ Statistics: Num rows: 1 Data size: 372 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col9 (type: float)
Execution mode: vectorized, llap
LLAP IO: all inputs (cache only)
Reducer 2
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out
index e1d6dfa18c..358586d43b 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out
@@ -53,20 +53,24 @@ STAGE PLANS:
TableScan
alias: tbl_ice_parquet
Statistics: Num rows: 10 Data size: 920 Basic stats:
COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: max(a)
- keys: b (type: string)
- minReductionHashAggr: 0.5
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 5 Data size: 460 Basic stats:
COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- null sort order: z
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Select Operator
+ expressions: a (type: int), b (type: string)
+ outputColumnNames: a, b
+ Statistics: Num rows: 10 Data size: 920 Basic stats:
COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: max(a)
+ keys: b (type: string)
+ minReductionHashAggr: 0.5
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 5 Data size: 460 Basic stats:
COMPLETE Column stats: COMPLETE
- value expressions: _col1 (type: int)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 5 Data size: 460 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col1 (type: int)
Execution mode: vectorized, llap
LLAP IO: all inputs (cache only)
Reducer 2
@@ -170,20 +174,24 @@ STAGE PLANS:
TableScan
alias: tbl_ice_parquet_all_types
Statistics: Num rows: 1 Data size: 372 Basic stats: COMPLETE
Column stats: COMPLETE
- Group By Operator
- aggregations: max(t_float)
- keys: t_double (type: double), t_boolean (type: boolean),
t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), t_string
(type: string), t_timestamp (type: timestamp), t_date (type: date), t_decimal
(type: decimal(4,2))
- minReductionHashAggr: 0.99
- mode: hash
- outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5, _col6, _col7, _col8, _col9
+ Select Operator
+ expressions: t_float (type: float), t_double (type:
double), t_boolean (type: boolean), t_int (type: int), t_bigint (type: bigint),
t_binary (type: binary), t_string (type: string), t_timestamp (type:
timestamp), t_date (type: date), t_decimal (type: decimal(4,2))
+ outputColumnNames: t_float, t_double, t_boolean, t_int,
t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
Statistics: Num rows: 1 Data size: 372 Basic stats:
COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: double), _col1 (type:
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type:
decimal(4,2))
- null sort order: zzzzzzzzz
- sort order: +++++++++
- Map-reduce partition columns: _col0 (type: double),
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type:
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date),
_col8 (type: decimal(4,2))
+ Group By Operator
+ aggregations: max(t_float)
+ keys: t_double (type: double), t_boolean (type:
boolean), t_int (type: int), t_bigint (type: bigint), t_binary (type: binary),
t_string (type: string), t_timestamp (type: timestamp), t_date (type: date),
t_decimal (type: decimal(4,2))
+ minReductionHashAggr: 0.99
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5, _col6, _col7, _col8, _col9
Statistics: Num rows: 1 Data size: 372 Basic stats:
COMPLETE Column stats: COMPLETE
- value expressions: _col9 (type: float)
+ Reduce Output Operator
+ key expressions: _col0 (type: double), _col1 (type:
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type:
decimal(4,2))
+ null sort order: zzzzzzzzz
+ sort order: +++++++++
+ Map-reduce partition columns: _col0 (type: double),
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type:
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date),
_col8 (type: decimal(4,2))
+ Statistics: Num rows: 1 Data size: 372 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col9 (type: float)
Execution mode: vectorized, llap
LLAP IO: all inputs (cache only)
Reducer 2
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_virtualcol.q.out
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_virtualcol.q.out
new file mode 100644
index 0000000000..be0ec022fa
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_virtualcol.q.out
@@ -0,0 +1,54 @@
+PREHOOK: query: drop table if exists tbl_ice
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_ice
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_ice(a int, b string) partitioned by
(c int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: create external table tbl_ice(a int, b string) partitioned by
(c int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 50), (3,
'three', 50), (4, 'four', 52), (5, 'five', 54), (111, 'one', 52), (333, 'two',
56)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 50),
(3, 'three', 50), (4, 'four', 52), (5, 'five', 54), (111, 'one', 52), (333,
'two', 56)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: select a, c, tbl_ice.PARTITION__SPEC__ID,
tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION from tbl_ice
+order by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select a, c, tbl_ice.PARTITION__SPEC__ID,
tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION from tbl_ice
+order by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+3 50 0 81 2
+2 50 0 81 1
+1 50 0 81 0
+111 52 0 83 1
+4 52 0 83 0
+5 54 0 85 0
+333 56 0 87 0
+PREHOOK: query: select a, c, tbl_ice.PARTITION__SPEC__ID,
tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION from tbl_ice
+sort by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select a, c, tbl_ice.PARTITION__SPEC__ID,
tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION from tbl_ice
+sort by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+3 50 0 81 2
+2 50 0 81 1
+1 50 0 81 0
+111 52 0 83 1
+4 52 0 83 0
+5 54 0 85 0
+333 56 0 87 0
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out
index e6d990caa4..2701494205 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out
@@ -62,16 +62,18 @@ Stage-0
limit:-1
Stage-1
Reducer 2 vectorized
- File Output Operator [FS_10]
- Group By Operator [GBY_9] (rows=10 width=92)
+ File Output Operator [FS_11]
+ Group By Operator [GBY_10] (rows=10 width=92)
Output:["_col0","_col1"],aggregations:["max(VALUE._col0)"],keys:KEY._col0
<-Map 1 [SIMPLE_EDGE] vectorized
- SHUFFLE [RS_8]
+ SHUFFLE [RS_9]
PartitionCols:_col0
- Group By Operator [GBY_7] (rows=10 width=92)
+ Group By Operator [GBY_8] (rows=10 width=92)
Output:["_col0","_col1"],aggregations:["max(a)"],keys:b
- TableScan [TS_0] (rows=19 width=92)
-
default@tbl_ice_mixed,tbl_ice_mixed,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
+ Select Operator [SEL_7] (rows=19 width=92)
+ Output:["a","b"]
+ TableScan [TS_0] (rows=19 width=92)
+
default@tbl_ice_mixed,tbl_ice_mixed,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
PREHOOK: query: select b, max(a) from tbl_ice_mixed group by b
PREHOOK: type: QUERY
@@ -165,18 +167,20 @@ Stage-0
limit:-1
Stage-1
Reducer 2 vectorized
- File Output Operator [FS_11]
- Select Operator [SEL_10] (rows=2 width=373)
+ File Output Operator [FS_12]
+ Select Operator [SEL_11] (rows=2 width=373)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
- Group By Operator [GBY_9] (rows=2 width=373)
+ Group By Operator [GBY_10] (rows=2 width=373)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0,
KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7,
KEY._col8
<-Map 1 [SIMPLE_EDGE] vectorized
- SHUFFLE [RS_8]
+ SHUFFLE [RS_9]
PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6,
_col7, _col8
- Group By Operator [GBY_7] (rows=2 width=373)
+ Group By Operator [GBY_8] (rows=2 width=373)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double,
t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
- TableScan [TS_0] (rows=2 width=373)
-
default@tbl_ice_mixed_all_types,tbl_ice_mixed_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+ Select Operator [SEL_7] (rows=2 width=373)
+
Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+ TableScan [TS_0] (rows=2 width=373)
+
default@tbl_ice_mixed_all_types,tbl_ice_mixed_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
PREHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint,
t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_mixed_all_types
group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string,
t_timestamp, t_date, t_decimal
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_orc.q.out
index 25764be089..e1d25d5321 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_orc.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_orc.q.out
@@ -46,16 +46,18 @@ Stage-0
limit:-1
Stage-1
Reducer 2 vectorized
- File Output Operator [FS_10]
- Group By Operator [GBY_9] (rows=5 width=92)
+ File Output Operator [FS_11]
+ Group By Operator [GBY_10] (rows=5 width=92)
Output:["_col0","_col1"],aggregations:["max(VALUE._col0)"],keys:KEY._col0
<-Map 1 [SIMPLE_EDGE] vectorized
- SHUFFLE [RS_8]
+ SHUFFLE [RS_9]
PartitionCols:_col0
- Group By Operator [GBY_7] (rows=5 width=92)
+ Group By Operator [GBY_8] (rows=5 width=92)
Output:["_col0","_col1"],aggregations:["max(a)"],keys:b
- TableScan [TS_0] (rows=10 width=92)
-
default@tbl_ice_orc,tbl_ice_orc,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
+ Select Operator [SEL_7] (rows=10 width=92)
+ Output:["a","b"]
+ TableScan [TS_0] (rows=10 width=92)
+
default@tbl_ice_orc,tbl_ice_orc,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
PREHOOK: query: select b, max(a) from tbl_ice_orc group by b
PREHOOK: type: QUERY
@@ -128,18 +130,20 @@ Stage-0
limit:-1
Stage-1
Reducer 2 vectorized
- File Output Operator [FS_11]
- Select Operator [SEL_10] (rows=1 width=372)
+ File Output Operator [FS_12]
+ Select Operator [SEL_11] (rows=1 width=372)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
- Group By Operator [GBY_9] (rows=1 width=372)
+ Group By Operator [GBY_10] (rows=1 width=372)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0,
KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7,
KEY._col8
<-Map 1 [SIMPLE_EDGE] vectorized
- SHUFFLE [RS_8]
+ SHUFFLE [RS_9]
PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6,
_col7, _col8
- Group By Operator [GBY_7] (rows=1 width=372)
+ Group By Operator [GBY_8] (rows=1 width=372)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double,
t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
- TableScan [TS_0] (rows=1 width=372)
-
default@tbl_ice_orc_all_types,tbl_ice_orc_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+ Select Operator [SEL_7] (rows=1 width=372)
+
Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+ TableScan [TS_0] (rows=1 width=372)
+
default@tbl_ice_orc_all_types,tbl_ice_orc_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
PREHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint,
t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_orc_all_types
group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string,
t_timestamp, t_date, t_decimal
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_parquet.q.out
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_parquet.q.out
index 201a5efa4a..1a1782d7dd 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_parquet.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_parquet.q.out
@@ -46,16 +46,18 @@ Stage-0
limit:-1
Stage-1
Reducer 2 vectorized
- File Output Operator [FS_10]
- Group By Operator [GBY_9] (rows=5 width=92)
+ File Output Operator [FS_11]
+ Group By Operator [GBY_10] (rows=5 width=92)
Output:["_col0","_col1"],aggregations:["max(VALUE._col0)"],keys:KEY._col0
<-Map 1 [SIMPLE_EDGE] vectorized
- SHUFFLE [RS_8]
+ SHUFFLE [RS_9]
PartitionCols:_col0
- Group By Operator [GBY_7] (rows=5 width=92)
+ Group By Operator [GBY_8] (rows=5 width=92)
Output:["_col0","_col1"],aggregations:["max(a)"],keys:b
- TableScan [TS_0] (rows=10 width=92)
-
default@tbl_ice_parquet,tbl_ice_parquet,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
+ Select Operator [SEL_7] (rows=10 width=92)
+ Output:["a","b"]
+ TableScan [TS_0] (rows=10 width=92)
+
default@tbl_ice_parquet,tbl_ice_parquet,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
PREHOOK: query: select b, max(a) from tbl_ice_parquet group by b
PREHOOK: type: QUERY
@@ -128,18 +130,20 @@ Stage-0
limit:-1
Stage-1
Reducer 2 vectorized
- File Output Operator [FS_11]
- Select Operator [SEL_10] (rows=1 width=372)
+ File Output Operator [FS_12]
+ Select Operator [SEL_11] (rows=1 width=372)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
- Group By Operator [GBY_9] (rows=1 width=372)
+ Group By Operator [GBY_10] (rows=1 width=372)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0,
KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7,
KEY._col8
<-Map 1 [SIMPLE_EDGE] vectorized
- SHUFFLE [RS_8]
+ SHUFFLE [RS_9]
PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6,
_col7, _col8
- Group By Operator [GBY_7] (rows=1 width=372)
+ Group By Operator [GBY_8] (rows=1 width=372)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double,
t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
- TableScan [TS_0] (rows=1 width=372)
-
default@tbl_ice_parquet_all_types,tbl_ice_parquet_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+ Select Operator [SEL_7] (rows=1 width=372)
+
Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+ TableScan [TS_0] (rows=1 width=372)
+
default@tbl_ice_parquet_all_types,tbl_ice_parquet_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
PREHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint,
t_binary, t_string, t_timestamp, t_date, t_decimal from
tbl_ice_parquet_all_types
group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string,
t_timestamp, t_date, t_decimal
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index a70a6356a0..64a9a1becd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -78,8 +78,8 @@ public class FetchTask extends Task<FetchWork> implements
Serializable {
if (source instanceof TableScanOperator) {
TableScanOperator ts = (TableScanOperator) source;
// push down projections
- ColumnProjectionUtils.appendReadColumns(
- job, ts.getNeededColumnIDs(), ts.getNeededColumns(),
ts.getNeededNestedColumnPaths());
+ ColumnProjectionUtils.appendReadColumns(job, ts.getNeededColumnIDs(),
ts.getNeededColumns(),
+ ts.getNeededNestedColumnPaths(),
ts.getConf().hasVirtualCols());
// push down filters and as of information
HiveInputFormat.pushFiltersAndAsOf(job, ts, null);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 91cf0c3c03..4204d920f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
+import static
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperation;
import java.io.IOException;
import java.io.Serializable;
@@ -616,6 +617,7 @@ public class FileSinkOperator extends
TerminalOperator<FileSinkDesc> implements
initializeSpecPath();
fs = specPath.getFileSystem(hconf);
+ setWriteOperation(hconf, getConf().getTableInfo().getTableName(),
getConf().getWriteOperation());
if (hconf instanceof JobConf) {
jc = (JobConf) hconf;
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index 8c328234ad..1fdc03557e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -370,12 +370,14 @@ public class MapOperator extends AbstractMapOperator {
Configuration clonedConf = new Configuration(hconf);
clonedConf.unset(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
clonedConf.unset(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+
clonedConf.unset(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR);
clonedConf.unset(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
tableNameToConf.put(tableName, clonedConf);
}
Configuration newConf = tableNameToConf.get(tableName);
ColumnProjectionUtils.appendReadColumns(newConf,
tableScanDesc.getNeededColumnIDs(),
- tableScanDesc.getOutputColumnNames(),
tableScanDesc.getNeededNestedColumnPaths());
+ tableScanDesc.getOutputColumnNames(),
tableScanDesc.getNeededNestedColumnPaths(),
+ tableScanDesc.hasVirtualCols());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
index e58036a0bb..b3f167c3e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
@@ -202,8 +202,8 @@ public class SMBMapJoinOperator extends
AbstractMapJoinOperator<SMBJoinDesc> imp
TableScanOperator ts = (TableScanOperator)aliasToWork.get(alias);
// push down projections
- ColumnProjectionUtils.appendReadColumns(
- jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(),
ts.getNeededNestedColumnPaths());
+ ColumnProjectionUtils.appendReadColumns(jobClone,
ts.getNeededColumnIDs(), ts.getNeededColumns(),
+ ts.getNeededNestedColumnPaths(), ts.conf.hasVirtualCols());
// push down filters and as of information
HiveInputFormat.pushFiltersAndAsOf(jobClone, ts, null);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 0d4289a020..62b74dc842 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -477,8 +477,8 @@ public class MapredLocalTask extends Task<MapredLocalWork>
implements Serializab
TableScanOperator ts =
(TableScanOperator)work.getAliasToWork().get(entry.getKey());
// push down projections
- ColumnProjectionUtils.appendReadColumns(
- jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(),
ts.getNeededNestedColumnPaths());
+ ColumnProjectionUtils.appendReadColumns(jobClone,
ts.getNeededColumnIDs(), ts.getNeededColumns(),
+ ts.getNeededNestedColumnPaths(), ts.getConf().hasVirtualCols());
// push down filters and as of information
HiveInputFormat.pushFiltersAndAsOf(jobClone, ts, null);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 5a3def4d0d..403fb9ed01 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -786,6 +786,7 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, ""));;
StringBuilder readColumnNamesBuffer = new StringBuilder(newjob.
get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
+ boolean fetchVirtualColumns =
newjob.getBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, false);
// for each dir, get the InputFormat, and do getSplits.
for (Path dir : dirs) {
PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
@@ -804,8 +805,10 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
readColumnsBuffer.setLength(0);
readColumnNamesBuffer.setLength(0);
// push down projections.
- ColumnProjectionUtils.appendReadColumns(readColumnsBuffer,
readColumnNamesBuffer,
+ ColumnProjectionUtils.appendReadColumns(
+ readColumnsBuffer, readColumnNamesBuffer,
tableScan.getNeededColumnIDs(), tableScan.getNeededColumns());
+ fetchVirtualColumns = tableScan.getConf().hasVirtualCols();
pushDownProjection = true;
// push down filters and as of information
pushFiltersAndAsOf(newjob, tableScan, this.mrwork);
@@ -829,7 +832,7 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
// set columns to read in conf
if (pushDownProjection) {
- pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer);
+ pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer,
fetchVirtualColumns);
}
addSplitsForGroup(currentDirs, currentTableScan, newjob,
@@ -847,7 +850,7 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
// set columns to read in conf
if (pushDownProjection) {
- pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer);
+ pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer,
fetchVirtualColumns);
}
if (dirs.length != 0) { // TODO: should this be currentDirs?
@@ -865,15 +868,17 @@ public class HiveInputFormat<K extends
WritableComparable, V extends Writable>
}
private void pushProjection(final JobConf newjob, final StringBuilder
readColumnsBuffer,
- final StringBuilder readColumnNamesBuffer) {
+ final StringBuilder readColumnNamesBuffer, final boolean
fetchVirtualColumns) {
String readColIds = readColumnsBuffer.toString();
String readColNames = readColumnNamesBuffer.toString();
newjob.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
newjob.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIds);
newjob.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNames);
+ newjob.setBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR,
fetchVirtualColumns);
LOG.info("{} = {}", ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
readColIds);
LOG.info("{} = {}", ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNames);
+ LOG.info("{} = {}", ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR,
fetchVirtualColumns);
}
@@ -1042,7 +1047,11 @@ public class HiveInputFormat<K extends
WritableComparable, V extends Writable>
TableScanOperator ts = (TableScanOperator) op;
// push down projections.
ColumnProjectionUtils.appendReadColumns(
- jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns(),
ts.getNeededNestedColumnPaths());
+ jobConf,
+ ts.getNeededColumnIDs(),
+ ts.getNeededColumns(),
+ ts.getNeededNestedColumnPaths(),
+ ts.getConf().hasVirtualCols());
// push down filters and as of information
pushFiltersAndAsOf(jobConf, ts, this.mrwork);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
index 6d525ba269..f138c1e254 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
@@ -164,7 +164,7 @@ public class ProjectionPusher {
private void pushFilters(final JobConf jobConf, RowSchema rowSchema,
ExprNodeGenericFuncDesc filterExpr) {
// construct column name list for reference by filter push down
- Utilities.setColumnNameList(jobConf, rowSchema);
+ Utilities.setColumnNameList(jobConf, rowSchema, true);
// push down filters
if (filterExpr == null) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 8d75db400b..5dac8c0661 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -371,7 +371,8 @@ public interface HiveStorageHandler extends Configurable {
* @return the created DP context object, null if DP context / sorting is
not required
* @throws SemanticException
*/
- default DynamicPartitionCtx createDPContext(HiveConf conf,
org.apache.hadoop.hive.ql.metadata.Table table)
+ default DynamicPartitionCtx createDPContext(
+ HiveConf conf, org.apache.hadoop.hive.ql.metadata.Table table,
Operation writeOperation)
throws SemanticException {
Preconditions.checkState(alwaysUnpartitioned(), "Should only be called for
table formats where partitioning " +
"is not handled by Hive but the table format itself. See
alwaysUnpartitioned() method.");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 618c60baa7..c1dc1feb02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8131,6 +8131,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
RowSchema fsRS, boolean canBeMerged,
Table dest_tab, Long mmWriteId, boolean isMmCtas,
Integer dest_type, QB qb, boolean
isDirectInsert, AcidUtils.Operation acidOperation, String moveTaskId) throws
SemanticException {
boolean isInsertOverwrite = false;
+ Context.Operation writeOperation = getWriteOperation(dest);
switch (dest_type) {
case QBMetaData.DEST_PARTITION:
//fall through
@@ -8146,7 +8147,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
// Some non-native tables might be partitioned without partition spec
information being present in the Table object
HiveStorageHandler storageHandler = dest_tab.getStorageHandler();
if (storageHandler != null && storageHandler.alwaysUnpartitioned()) {
- DynamicPartitionCtx nonNativeDpCtx =
storageHandler.createDPContext(conf, dest_tab);
+ DynamicPartitionCtx nonNativeDpCtx =
storageHandler.createDPContext(conf, dest_tab, writeOperation);
if (dpCtx == null && nonNativeDpCtx != null) {
dpCtx = nonNativeDpCtx;
}
@@ -8187,6 +8188,8 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
acidFileSinks.add(fileSinkDesc);
}
+ fileSinkDesc.setWriteOperation(writeOperation);
+
fileSinkDesc.setTemporary(destTableIsTemporary);
fileSinkDesc.setMaterialization(destTableIsMaterialization);
@@ -11478,20 +11481,19 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()), alias,
true));
}
+ // put virtual columns into RowResolver.
List<VirtualColumn> vcList = new ArrayList<>();
- boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(tab);
- boolean isUpdateDelete = this instanceof UpdateDeleteSemanticAnalyzer;
- // put all virtual columns in RowResolver.
- if (!tab.isNonNative() || (nonNativeAcid && isUpdateDelete)) {
- vcList = VirtualColumn.getRegistry(conf);
- if (nonNativeAcid && isUpdateDelete) {
- vcList.addAll(tab.getStorageHandler().acidVirtualColumns());
- }
- vcList.forEach(vc -> rwsch.put(alias, vc.getName().toLowerCase(), new
ColumnInfo(vc.getName(),
- vc.getTypeInfo(), alias, true, vc.getIsHidden()
- )));
+ if (!tab.isNonNative()) {
+ vcList.addAll(VirtualColumn.getRegistry(conf));
+ }
+ if (tab.isNonNative() && AcidUtils.isNonNativeAcidTable(tab)) {
+ vcList.addAll(tab.getStorageHandler().acidVirtualColumns());
}
+ vcList.forEach(vc -> rwsch.put(alias, vc.getName().toLowerCase(), new
ColumnInfo(vc.getName(),
+ vc.getTypeInfo(), alias, true, vc.getIsHidden()
+ )));
+
// Create the root of the operator tree
TableScanDesc tsDesc = new TableScanDesc(alias, vcList, tab);
setupStats(tsDesc, qb.getParseInfo(), tab, alias, rwsch);
@@ -15018,6 +15020,12 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
AcidUtils.Operation.INSERT);
}
+ private Context.Operation getWriteOperation(String destination) {
+ return deleting(destination) ? Context.Operation.DELETE :
+ (updating(destination) ? Context.Operation.UPDATE :
+ Context.Operation.OTHER);
+ }
+
private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of,
String dest,
boolean isMM) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 70121096a8..3b215a7acf 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -102,9 +102,6 @@ public class UpdateDeleteSemanticAnalyzer extends
RewriteSemanticAnalyzer {
validateTxnManager(mTable);
validateTargetTable(mTable);
- // save the operation type into the query state
- SessionStateUtil.addResource(conf,
Context.Operation.class.getSimpleName(), operation.name());
-
StringBuilder rewrittenQueryStr = new StringBuilder();
rewrittenQueryStr.append("insert into table ");
rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 67ae275363..f2742bdfe5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.plan;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -27,6 +26,7 @@ import java.util.Set;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
@@ -96,6 +96,7 @@ public class FileSinkDesc extends AbstractOperatorDesc
implements IStatsGatherDe
// Record what type of write this is. Default is non-ACID (ie old style).
private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
+ private Context.Operation writeOperation = Context.Operation.OTHER;
private long tableWriteId = 0; // table write id for this operation
private int statementId = -1;
private int maxStmtId = -1;
@@ -582,6 +583,15 @@ public class FileSinkDesc extends AbstractOperatorDesc
implements IStatsGatherDe
public AcidUtils.Operation getWriteType() {
return writeType;
}
+
+ public void setWriteOperation(Context.Operation writeOperation) {
+ this.writeOperation = writeOperation;
+ }
+
+ public Context.Operation getWriteOperation() {
+ return writeOperation;
+ }
+
@Explain(displayName = "Write Type")
public String getWriteTypeString() {
return getWriteType() == AcidUtils.Operation.NOT_ACID ? null :
getWriteType().toString();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
index 0787daa7b7..8be4cfc5b8 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
@@ -20,12 +20,18 @@ package org.apache.hadoop.hive.ql.security.authorization;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.common.StatsSetupConst;
public class HiveCustomStorageHandlerUtils {
+ public static final String WRITE_OPERATION_CONFIG_PREFIX =
"file.sink.write.operation.";
+
+
public static String getTablePropsForCustomStorageHandler(Map<String,
String> tableProperties) {
StringBuilder properties = new StringBuilder();
for (Map.Entry<String,String> serdeMap : tableProperties.entrySet()) {
@@ -48,4 +54,21 @@ public class HiveCustomStorageHandlerUtils {
.ifPresent(tblProps::putAll);
return tblProps;
}
+
+ public static Context.Operation getWriteOperation(Configuration conf,
String tableName) {
+ if (conf == null || tableName == null) {
+ return null;
+ }
+
+ String operation = conf.get(WRITE_OPERATION_CONFIG_PREFIX + tableName);
+ return operation == null ? null : Context.Operation.valueOf(operation);
+ }
+
+ public static void setWriteOperation(Configuration conf, String tableName,
Context.Operation operation) {
+ if (conf == null || tableName == null) {
+ return;
+ }
+
+ conf.set(WRITE_OPERATION_CONFIG_PREFIX + tableName, operation.name());
+ }
}
diff --git
a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
index 70187b6122..5cd776a2e3 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
@@ -52,6 +52,7 @@ public final class ColumnProjectionUtils {
"hive.io.file.readNestedColumn.paths";
public static final String READ_ALL_COLUMNS =
"hive.io.file.read.all.columns";
public static final String READ_COLUMN_NAMES_CONF_STR =
"hive.io.file.readcolumn.names";
+ public static final String FETCH_VIRTUAL_COLUMNS_CONF_STR =
"hive.io.file.fetch.virtual.columns";
private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = "";
private static final String READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT = "";
private static final boolean READ_ALL_COLUMNS_DEFAULT = true;
@@ -161,7 +162,7 @@ public final class ColumnProjectionUtils {
* @param names Column names.
*/
public static void appendReadColumns(
- Configuration conf, List<Integer> ids, List<String> names, List<String>
groupPaths) {
+ Configuration conf, List<Integer> ids, List<String> names, List<String>
groupPaths, boolean fetchVirtualCols) {
if (ids.size() != names.size()) {
LOG.warn("Read column counts do not match: "
+ ids.size() + " ids, " + names.size() + " names");
@@ -169,6 +170,7 @@ public final class ColumnProjectionUtils {
appendReadColumns(conf, ids);
appendReadColumnNames(conf, names);
appendNestedColumnPaths(conf, groupPaths);
+ conf.setBoolean(FETCH_VIRTUAL_COLUMNS_CONF_STR, fetchVirtualCols);
}
public static void appendReadColumns(