This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 79f2a220c3 HIVE-26264: Iceberg integration: Fetch virtual columns on 
demand (Krisztian Kasa, reviewed by Peter Vary)
79f2a220c3 is described below

commit 79f2a220c3a0e5303ed89c2054ab277a83381448
Author: Krisztian Kasa <[email protected]>
AuthorDate: Fri Jun 3 06:13:27 2022 +0200

    HIVE-26264: Iceberg integration: Fetch virtual columns on demand (Krisztian 
Kasa, reviewed by Peter Vary)
---
 .../org/apache/iceberg/mr/InputFormatConfig.java   |  5 ++
 .../iceberg/mr/hive/HiveIcebergInputFormat.java    |  4 ++
 .../mr/hive/HiveIcebergOutputCommitter.java        |  3 +-
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |  3 +-
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   |  3 +-
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 16 ++----
 .../apache/iceberg/mr/hive/IcebergAcidUtil.java    | 62 +++++++++++++++++++++-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 50 ++++++-----------
 .../queries/positive/query_iceberg_virtualcol.q    |  9 ++++
 .../llap/vectorized_iceberg_read_mixed.q.out       | 58 +++++++++++---------
 .../llap/vectorized_iceberg_read_orc.q.out         | 58 +++++++++++---------
 .../llap/vectorized_iceberg_read_parquet.q.out     | 58 +++++++++++---------
 .../positive/query_iceberg_virtualcol.q.out        | 54 +++++++++++++++++++
 .../positive/vectorized_iceberg_read_mixed.q.out   | 30 ++++++-----
 .../positive/vectorized_iceberg_read_orc.q.out     | 30 ++++++-----
 .../positive/vectorized_iceberg_read_parquet.q.out | 30 ++++++-----
 .../org/apache/hadoop/hive/ql/exec/FetchTask.java  |  4 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java      |  2 +
 .../apache/hadoop/hive/ql/exec/MapOperator.java    |  4 +-
 .../hadoop/hive/ql/exec/SMBMapJoinOperator.java    |  4 +-
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java    |  4 +-
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  | 19 +++++--
 .../hive/ql/io/parquet/ProjectionPusher.java       |  2 +-
 .../hive/ql/metadata/HiveStorageHandler.java       |  3 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     | 32 ++++++-----
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java     |  3 --
 .../apache/hadoop/hive/ql/plan/FileSinkDesc.java   | 12 ++++-
 .../HiveCustomStorageHandlerUtils.java             | 23 ++++++++
 .../hadoop/hive/serde2/ColumnProjectionUtils.java  |  4 +-
 29 files changed, 393 insertions(+), 196 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 4bcddf3ef7..b7a02ed365 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -77,6 +77,7 @@ public class InputFormatConfig {
 
   public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name";
   public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
+  public static final String FETCH_VIRTUAL_COLUMNS = 
"iceberg.mr.fetch.virtual.columns";
   public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";
 
   public static final String CONFIG_SERIALIZATION_DISABLED = 
"iceberg.mr.config.serialization.disabled";
@@ -235,6 +236,10 @@ public class InputFormatConfig {
     return readColumns.split(conf.get(serdeConstants.COLUMN_NAME_DELIMITER, 
String.valueOf(SerDeUtils.COMMA)));
   }
 
+  public static boolean fetchVirtualColumns(Configuration conf) {
+    return conf.getBoolean(InputFormatConfig.FETCH_VIRTUAL_COLUMNS, false);
+  }
+
   /**
    * Get Hadoop config key of a catalog property based on catalog name
    * @param catalogName catalog name
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 38c75fad94..23d76d4260 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -134,6 +134,8 @@ public class HiveIcebergInputFormat extends 
MapredIcebergInputFormat<Record>
     }
 
     job.set(InputFormatConfig.SELECTED_COLUMNS, 
job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
+    job.setBoolean(InputFormatConfig.FETCH_VIRTUAL_COLUMNS,
+            
job.getBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, false));
     job.set(InputFormatConfig.AS_OF_TIMESTAMP, 
job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1"));
     job.set(InputFormatConfig.SNAPSHOT_ID, 
job.get(TableScanDesc.AS_OF_VERSION, "-1"));
 
@@ -147,6 +149,8 @@ public class HiveIcebergInputFormat extends 
MapredIcebergInputFormat<Record>
   public RecordReader<Void, Container<Record>> getRecordReader(InputSplit 
split, JobConf job,
                                                                Reporter 
reporter) throws IOException {
     job.set(InputFormatConfig.SELECTED_COLUMNS, 
job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
+    job.setBoolean(InputFormatConfig.FETCH_VIRTUAL_COLUMNS,
+            
job.getBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, false));
 
     if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) 
&& Utilities.getIsVectorized(job)) {
       Preconditions.checkArgument(MetastoreUtil.hive3PresentOnClasspath(), 
"Vectorization only supported for Hive 3+");
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 825ee3dc39..52507cf65c 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
@@ -338,7 +339,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
       if (writeResults.isEmpty()) {
         LOG.info(
             "Not creating a new commit for table: {}, jobID: {}, operation: 
{}, since there were no new files to add",
-            table, jobContext.getJobID(), 
HiveIcebergStorageHandler.operation(conf, name));
+            table, jobContext.getJobID(), 
HiveCustomStorageHandlerUtils.getWriteOperation(conf, name));
       } else {
         commitWrite(table, startTime, writeResults);
       }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 7bf1b1ec0d..00fd9db2f0 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -72,7 +73,7 @@ public class HiveIcebergOutputFormat<T> implements 
OutputFormat<NullWritable, Co
         .tableName(tableName)
         .attemptID(taskAttemptID)
         .poolSize(poolSize)
-        .operation(HiveIcebergStorageHandler.operation(jc, tableName))
+        .operation(HiveCustomStorageHandlerUtils.getWriteOperation(jc, 
tableName))
         .build();
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 99820583a1..4c9eeb8f8d 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
+import 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -145,7 +146,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
   }
 
   private static Schema projectedSchema(Configuration configuration, String 
tableName, Schema tableSchema) {
-    Context.Operation operation = 
HiveIcebergStorageHandler.operation(configuration, tableName);
+    Context.Operation operation = 
HiveCustomStorageHandlerUtils.getWriteOperation(configuration, tableName);
     if (operation != null) {
       switch (operation) {
         case DELETE:
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index c79d344272..a6b5c6deba 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -353,11 +353,12 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   }
 
   @Override
-  public DynamicPartitionCtx createDPContext(HiveConf hiveConf, 
org.apache.hadoop.hive.ql.metadata.Table hmsTable)
+  public DynamicPartitionCtx createDPContext(
+          HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table 
hmsTable, Operation writeOperation)
       throws SemanticException {
     // delete records are already clustered by partition spec id and the hash 
of the partition struct
     // there is no need to do any additional sorting based on partition columns
-    if (getOperationType().equals(Operation.DELETE.name())) {
+    if (writeOperation == Operation.DELETE) {
       return null;
     }
 
@@ -382,7 +383,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       fieldOrderMap.put(fields.get(i).name(), i);
     }
 
-    int offset = acidSelectColumns(hmsTable, 
Operation.valueOf(getOperationType())).size();
+    int offset = acidSelectColumns(hmsTable, writeOperation).size();
     for (PartitionTransformSpec spec : partitionTransformSpecs) {
       int order = fieldOrderMap.get(spec.getColumnName());
       if 
(PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
@@ -573,15 +574,6 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     return false;
   }
 
-  public static Operation operation(Configuration conf, String tableName) {
-    if (conf == null || tableName == null) {
-      return null;
-    }
-
-    String operation = conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + 
tableName);
-    return operation == null ? null : Operation.valueOf(operation);
-  }
-
   /**
    * Returns the Table serialized to the configuration based on the table name.
    * If configuration is missing from the FileIO of the table, it will be 
populated with the input config.
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
index 73fe9743e7..6b4bca01e2 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
@@ -19,15 +19,21 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
@@ -40,12 +46,18 @@ public class IcebergAcidUtil {
 
   private static final Types.NestedField PARTITION_STRUCT_META_COL = null; // 
placeholder value in the map
   private static final Map<Types.NestedField, Integer> FILE_READ_META_COLS = 
Maps.newLinkedHashMap();
+  private static final Map<String, Types.NestedField> 
VIRTUAL_COLS_TO_META_COLS = Maps.newLinkedHashMap();
 
   static {
     FILE_READ_META_COLS.put(MetadataColumns.SPEC_ID, 0);
     FILE_READ_META_COLS.put(PARTITION_STRUCT_META_COL, 1);
     FILE_READ_META_COLS.put(MetadataColumns.FILE_PATH, 2);
     FILE_READ_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
+
+    VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.PARTITION_SPEC_ID.getName(), 
MetadataColumns.SPEC_ID);
+    VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.PARTITION_HASH.getName(), 
PARTITION_STRUCT_META_COL);
+    VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.FILE_PATH.getName(), 
MetadataColumns.FILE_PATH);
+    VIRTUAL_COLS_TO_META_COLS.put(VirtualColumn.ROW_POSITION.getName(), 
MetadataColumns.ROW_POSITION);
   }
 
   private static final Types.NestedField PARTITION_HASH_META_COL = 
Types.NestedField.required(
@@ -62,9 +74,9 @@ public class IcebergAcidUtil {
   /**
    * @param dataCols The columns of the original file read schema
    * @param table The table object - it is used for populating the partition 
struct meta column
-   * @return The schema for reading files, extended with metadata columns 
needed for deletes
+   * @return The schema for reading files, extended with metadata columns
    */
-  public static Schema createFileReadSchemaForDelete(List<Types.NestedField> 
dataCols, Table table) {
+  public static Schema 
createFileReadSchemaWithVirtualColums(List<Types.NestedField> dataCols, Table 
table) {
     List<Types.NestedField> cols = 
Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());
     FILE_READ_META_COLS.forEach((metaCol, index) -> {
       if (metaCol == PARTITION_STRUCT_META_COL) {
@@ -201,4 +213,50 @@ public class IcebergAcidUtil {
     }
     return partHash;
   }
+
+  public static void copyFields(GenericRecord source, int start, int len, 
GenericRecord target) {
+    for (int sourceIdx = start, targetIdx = 0; targetIdx < len; ++sourceIdx, 
++targetIdx) {
+      target.set(targetIdx, source.get(sourceIdx));
+    }
+  }
+
+  public static class VirtualColumnAwareIterator<T> implements 
CloseableIterator<T> {
+
+    private final CloseableIterator<T> currentIterator;
+
+    private GenericRecord current;
+    private final Schema expectedSchema;
+    private final Configuration conf;
+
+    public VirtualColumnAwareIterator(CloseableIterator<T> currentIterator, 
Schema expectedSchema, Configuration conf) {
+      this.currentIterator = currentIterator;
+      this.expectedSchema = expectedSchema;
+      this.conf = conf;
+      current = GenericRecord.create(
+          new Schema(expectedSchema.columns().subList(4, 
expectedSchema.columns().size())));
+    }
+
+    @Override
+    public void close() throws IOException {
+      currentIterator.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return currentIterator.hasNext();
+    }
+
+    @Override
+    public T next() {
+      T next = currentIterator.next();
+      GenericRecord rec = (GenericRecord) next;
+      PositionDeleteInfo.setIntoConf(conf,
+          IcebergAcidUtil.parseSpecId(rec),
+          IcebergAcidUtil.computePartitionHash(rec),
+          IcebergAcidUtil.parseFilePath(rec),
+          IcebergAcidUtil.parseFilePosition(rec));
+      IcebergAcidUtil.copyFields(rec, FILE_READ_META_COLS.size(), 
current.size(), current);
+      return (T) current;
+    }
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 0ccb750fda..7bbd03a09e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -32,8 +32,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapHiveUtils;
-import org.apache.hadoop.hive.ql.Context.Operation;
-import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -61,7 +59,6 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.data.GenericDeleteFilter;
-import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.IdentityPartitionConverters;
 import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.avro.DataReader;
@@ -241,7 +238,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     private T current;
     private CloseableIterator<T> currentIterator;
     private Table table;
-    private boolean updateOrDelete;
+    private boolean fetchVirtualColumns;
 
     @Override
     public void initialize(InputSplit split, TaskAttemptContext newContext) {
@@ -258,9 +255,16 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       this.reuseContainers = 
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
       this.inMemoryDataModel = 
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
               InputFormatConfig.InMemoryDataModel.GENERIC);
-      this.currentIterator = open(tasks.next(), expectedSchema).iterator();
-      Operation operation = HiveIcebergStorageHandler.operation(conf, 
conf.get(Catalogs.NAME));
-      this.updateOrDelete = Operation.DELETE.equals(operation) || 
Operation.UPDATE.equals(operation);
+      this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
+      this.currentIterator = nextTask();
+    }
+
+    private CloseableIterator<T> nextTask() {
+      CloseableIterator<T> closeableIterator = open(tasks.next(), 
expectedSchema).iterator();
+      if (!fetchVirtualColumns) {
+        return closeableIterator;
+      }
+      return new 
IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator, 
expectedSchema, conf);
     }
 
     @Override
@@ -268,18 +272,10 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       while (true) {
         if (currentIterator.hasNext()) {
           current = currentIterator.next();
-          if (updateOrDelete) {
-            GenericRecord rec = (GenericRecord) current;
-            PositionDeleteInfo.setIntoConf(conf,
-                IcebergAcidUtil.parseSpecId(rec),
-                IcebergAcidUtil.computePartitionHash(rec),
-                IcebergAcidUtil.parseFilePath(rec),
-                IcebergAcidUtil.parseFilePosition(rec));
-          }
           return true;
         } else if (tasks.hasNext()) {
           currentIterator.close();
-          currentIterator = open(tasks.next(), expectedSchema).iterator();
+          this.currentIterator = nextTask();
         } else {
           currentIterator.close();
           return false;
@@ -511,24 +507,11 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       readSchema = caseSensitive ? table.schema().select(selectedColumns) :
           table.schema().caseInsensitiveSelect(selectedColumns);
 
-      Operation operation = HiveIcebergStorageHandler.operation(conf, 
conf.get(Catalogs.NAME));
-      if (operation != null) {
-        switch (operation) {
-          case DELETE:
-            // for DELETE queries, add additional metadata columns into the 
read schema
-            return 
IcebergAcidUtil.createFileReadSchemaForDelete(readSchema.columns(), table);
-          case UPDATE:
-            // for UPDATE queries, add additional metadata columns into the 
read schema
-            return 
IcebergAcidUtil.createFileReadSchemaForUpdate(readSchema.columns(), table);
-          case OTHER:
-            // for INSERT queries no extra columns are needed
-            return readSchema;
-          default:
-            throw new IllegalArgumentException("Not supported operation " + 
operation);
-        }
-      } else {
-        return readSchema;
+      if (InputFormatConfig.fetchVirtualColumns(conf)) {
+        return 
IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), 
table);
       }
+
+      return readSchema;
     }
 
     private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, 
Map<Integer, ?> idToConstant) {
@@ -548,5 +531,4 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       return TypeUtil.selectNot(readSchema, collect);
     }
   }
-
 }
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_virtualcol.q 
b/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_virtualcol.q
new file mode 100644
index 0000000000..bf27fa291b
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_virtualcol.q
@@ -0,0 +1,9 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string) partitioned by (c int) stored 
by iceberg stored as orc tblproperties ('format-version'='2');
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 50), (3, 'three', 50), 
(4, 'four', 52), (5, 'five', 54), (111, 'one', 52), (333, 'two', 56);
+
+select a, c, tbl_ice.PARTITION__SPEC__ID, tbl_ice.PARTITION__HASH, 
tbl_ice.ROW__POSITION from tbl_ice
+order by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc;
+
+select a, c, tbl_ice.PARTITION__SPEC__ID, tbl_ice.PARTITION__HASH, 
tbl_ice.ROW__POSITION from tbl_ice
+sort by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc;
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out
index b3d9d7f5e8..e173ea1c84 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out
@@ -69,20 +69,24 @@ STAGE PLANS:
                 TableScan
                   alias: tbl_ice_mixed
                   Statistics: Num rows: 19 Data size: 1748 Basic stats: 
COMPLETE Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: max(a)
-                    keys: b (type: string)
-                    minReductionHashAggr: 0.4736842
-                    mode: hash
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 10 Data size: 920 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string)
-                      null sort order: z
-                      sort order: +
-                      Map-reduce partition columns: _col0 (type: string)
+                  Select Operator
+                    expressions: a (type: int), b (type: string)
+                    outputColumnNames: a, b
+                    Statistics: Num rows: 19 Data size: 1748 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      aggregations: max(a)
+                      keys: b (type: string)
+                      minReductionHashAggr: 0.4736842
+                      mode: hash
+                      outputColumnNames: _col0, _col1
                       Statistics: Num rows: 10 Data size: 920 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      value expressions: _col1 (type: int)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 10 Data size: 920 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: int)
             Execution mode: vectorized, llap
             LLAP IO: all inputs (cache only)
         Reducer 2 
@@ -207,20 +211,24 @@ STAGE PLANS:
                 TableScan
                   alias: tbl_ice_mixed_all_types
                   Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE 
Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: max(t_float)
-                    keys: t_double (type: double), t_boolean (type: boolean), 
t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), t_string 
(type: string), t_timestamp (type: timestamp), t_date (type: date), t_decimal 
(type: decimal(4,2))
-                    minReductionHashAggr: 0.99
-                    mode: hash
-                    outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5, _col6, _col7, _col8, _col9
+                  Select Operator
+                    expressions: t_float (type: float), t_double (type: 
double), t_boolean (type: boolean), t_int (type: int), t_bigint (type: bigint), 
t_binary (type: binary), t_string (type: string), t_timestamp (type: 
timestamp), t_date (type: date), t_decimal (type: decimal(4,2))
+                    outputColumnNames: t_float, t_double, t_boolean, t_int, 
t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
                     Statistics: Num rows: 2 Data size: 746 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: double), _col1 (type: 
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: 
decimal(4,2))
-                      null sort order: zzzzzzzzz
-                      sort order: +++++++++
-                      Map-reduce partition columns: _col0 (type: double), 
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: 
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), 
_col8 (type: decimal(4,2))
+                    Group By Operator
+                      aggregations: max(t_float)
+                      keys: t_double (type: double), t_boolean (type: 
boolean), t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), 
t_string (type: string), t_timestamp (type: timestamp), t_date (type: date), 
t_decimal (type: decimal(4,2))
+                      minReductionHashAggr: 0.99
+                      mode: hash
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5, _col6, _col7, _col8, _col9
                       Statistics: Num rows: 2 Data size: 746 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      value expressions: _col9 (type: float)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: double), _col1 (type: 
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: 
decimal(4,2))
+                        null sort order: zzzzzzzzz
+                        sort order: +++++++++
+                        Map-reduce partition columns: _col0 (type: double), 
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: 
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), 
_col8 (type: decimal(4,2))
+                        Statistics: Num rows: 2 Data size: 746 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        value expressions: _col9 (type: float)
             Execution mode: vectorized, llap
             LLAP IO: all inputs (cache only)
         Reducer 2 
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_orc.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_orc.q.out
index f1207fbec1..be5c780551 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_orc.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_orc.q.out
@@ -53,20 +53,24 @@ STAGE PLANS:
                 TableScan
                   alias: tbl_ice_orc
                   Statistics: Num rows: 10 Data size: 920 Basic stats: 
COMPLETE Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: max(a)
-                    keys: b (type: string)
-                    minReductionHashAggr: 0.5
-                    mode: hash
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 5 Data size: 460 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string)
-                      null sort order: z
-                      sort order: +
-                      Map-reduce partition columns: _col0 (type: string)
+                  Select Operator
+                    expressions: a (type: int), b (type: string)
+                    outputColumnNames: a, b
+                    Statistics: Num rows: 10 Data size: 920 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      aggregations: max(a)
+                      keys: b (type: string)
+                      minReductionHashAggr: 0.5
+                      mode: hash
+                      outputColumnNames: _col0, _col1
                       Statistics: Num rows: 5 Data size: 460 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      value expressions: _col1 (type: int)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 5 Data size: 460 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: int)
             Execution mode: vectorized, llap
             LLAP IO: all inputs (cache only)
         Reducer 2 
@@ -170,20 +174,24 @@ STAGE PLANS:
                 TableScan
                   alias: tbl_ice_orc_all_types
                   Statistics: Num rows: 1 Data size: 372 Basic stats: COMPLETE 
Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: max(t_float)
-                    keys: t_double (type: double), t_boolean (type: boolean), 
t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), t_string 
(type: string), t_timestamp (type: timestamp), t_date (type: date), t_decimal 
(type: decimal(4,2))
-                    minReductionHashAggr: 0.99
-                    mode: hash
-                    outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5, _col6, _col7, _col8, _col9
+                  Select Operator
+                    expressions: t_float (type: float), t_double (type: 
double), t_boolean (type: boolean), t_int (type: int), t_bigint (type: bigint), 
t_binary (type: binary), t_string (type: string), t_timestamp (type: 
timestamp), t_date (type: date), t_decimal (type: decimal(4,2))
+                    outputColumnNames: t_float, t_double, t_boolean, t_int, 
t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
                     Statistics: Num rows: 1 Data size: 372 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: double), _col1 (type: 
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: 
decimal(4,2))
-                      null sort order: zzzzzzzzz
-                      sort order: +++++++++
-                      Map-reduce partition columns: _col0 (type: double), 
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: 
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), 
_col8 (type: decimal(4,2))
+                    Group By Operator
+                      aggregations: max(t_float)
+                      keys: t_double (type: double), t_boolean (type: 
boolean), t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), 
t_string (type: string), t_timestamp (type: timestamp), t_date (type: date), 
t_decimal (type: decimal(4,2))
+                      minReductionHashAggr: 0.99
+                      mode: hash
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5, _col6, _col7, _col8, _col9
                       Statistics: Num rows: 1 Data size: 372 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      value expressions: _col9 (type: float)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: double), _col1 (type: 
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: 
decimal(4,2))
+                        null sort order: zzzzzzzzz
+                        sort order: +++++++++
+                        Map-reduce partition columns: _col0 (type: double), 
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: 
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), 
_col8 (type: decimal(4,2))
+                        Statistics: Num rows: 1 Data size: 372 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        value expressions: _col9 (type: float)
             Execution mode: vectorized, llap
             LLAP IO: all inputs (cache only)
         Reducer 2 
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out
index e1d6dfa18c..358586d43b 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out
@@ -53,20 +53,24 @@ STAGE PLANS:
                 TableScan
                   alias: tbl_ice_parquet
                   Statistics: Num rows: 10 Data size: 920 Basic stats: 
COMPLETE Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: max(a)
-                    keys: b (type: string)
-                    minReductionHashAggr: 0.5
-                    mode: hash
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 5 Data size: 460 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string)
-                      null sort order: z
-                      sort order: +
-                      Map-reduce partition columns: _col0 (type: string)
+                  Select Operator
+                    expressions: a (type: int), b (type: string)
+                    outputColumnNames: a, b
+                    Statistics: Num rows: 10 Data size: 920 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      aggregations: max(a)
+                      keys: b (type: string)
+                      minReductionHashAggr: 0.5
+                      mode: hash
+                      outputColumnNames: _col0, _col1
                       Statistics: Num rows: 5 Data size: 460 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      value expressions: _col1 (type: int)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 5 Data size: 460 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: int)
             Execution mode: vectorized, llap
             LLAP IO: all inputs (cache only)
         Reducer 2 
@@ -170,20 +174,24 @@ STAGE PLANS:
                 TableScan
                   alias: tbl_ice_parquet_all_types
                   Statistics: Num rows: 1 Data size: 372 Basic stats: COMPLETE 
Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: max(t_float)
-                    keys: t_double (type: double), t_boolean (type: boolean), 
t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), t_string 
(type: string), t_timestamp (type: timestamp), t_date (type: date), t_decimal 
(type: decimal(4,2))
-                    minReductionHashAggr: 0.99
-                    mode: hash
-                    outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5, _col6, _col7, _col8, _col9
+                  Select Operator
+                    expressions: t_float (type: float), t_double (type: 
double), t_boolean (type: boolean), t_int (type: int), t_bigint (type: bigint), 
t_binary (type: binary), t_string (type: string), t_timestamp (type: 
timestamp), t_date (type: date), t_decimal (type: decimal(4,2))
+                    outputColumnNames: t_float, t_double, t_boolean, t_int, 
t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
                     Statistics: Num rows: 1 Data size: 372 Basic stats: 
COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: double), _col1 (type: 
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: 
decimal(4,2))
-                      null sort order: zzzzzzzzz
-                      sort order: +++++++++
-                      Map-reduce partition columns: _col0 (type: double), 
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: 
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), 
_col8 (type: decimal(4,2))
+                    Group By Operator
+                      aggregations: max(t_float)
+                      keys: t_double (type: double), t_boolean (type: 
boolean), t_int (type: int), t_bigint (type: bigint), t_binary (type: binary), 
t_string (type: string), t_timestamp (type: timestamp), t_date (type: date), 
t_decimal (type: decimal(4,2))
+                      minReductionHashAggr: 0.99
+                      mode: hash
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5, _col6, _col7, _col8, _col9
                       Statistics: Num rows: 1 Data size: 372 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      value expressions: _col9 (type: float)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: double), _col1 (type: 
boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 
(type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: 
decimal(4,2))
+                        null sort order: zzzzzzzzz
+                        sort order: +++++++++
+                        Map-reduce partition columns: _col0 (type: double), 
_col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: 
binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), 
_col8 (type: decimal(4,2))
+                        Statistics: Num rows: 1 Data size: 372 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        value expressions: _col9 (type: float)
             Execution mode: vectorized, llap
             LLAP IO: all inputs (cache only)
         Reducer 2 
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_virtualcol.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_virtualcol.q.out
new file mode 100644
index 0000000000..be0ec022fa
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_virtualcol.q.out
@@ -0,0 +1,54 @@
+PREHOOK: query: drop table if exists tbl_ice
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_ice
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_ice(a int, b string) partitioned by 
(c int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: create external table tbl_ice(a int, b string) partitioned by 
(c int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 50), (3, 
'three', 50), (4, 'four', 52), (5, 'five', 54), (111, 'one', 52), (333, 'two', 
56)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 50), 
(3, 'three', 50), (4, 'four', 52), (5, 'five', 54), (111, 'one', 52), (333, 
'two', 56)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: select a, c, tbl_ice.PARTITION__SPEC__ID, 
tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION from tbl_ice
+order by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select a, c, tbl_ice.PARTITION__SPEC__ID, 
tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION from tbl_ice
+order by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+3      50      0       81      2
+2      50      0       81      1
+1      50      0       81      0
+111    52      0       83      1
+4      52      0       83      0
+5      54      0       85      0
+333    56      0       87      0
+PREHOOK: query: select a, c, tbl_ice.PARTITION__SPEC__ID, 
tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION from tbl_ice
+sort by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select a, c, tbl_ice.PARTITION__SPEC__ID, 
tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION from tbl_ice
+sort by tbl_ice.PARTITION__HASH, tbl_ice.ROW__POSITION desc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+3      50      0       81      2
+2      50      0       81      1
+1      50      0       81      0
+111    52      0       83      1
+4      52      0       83      0
+5      54      0       85      0
+333    56      0       87      0
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out
index e6d990caa4..2701494205 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out
@@ -62,16 +62,18 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 2 vectorized
-      File Output Operator [FS_10]
-        Group By Operator [GBY_9] (rows=10 width=92)
+      File Output Operator [FS_11]
+        Group By Operator [GBY_10] (rows=10 width=92)
           
Output:["_col0","_col1"],aggregations:["max(VALUE._col0)"],keys:KEY._col0
         <-Map 1 [SIMPLE_EDGE] vectorized
-          SHUFFLE [RS_8]
+          SHUFFLE [RS_9]
             PartitionCols:_col0
-            Group By Operator [GBY_7] (rows=10 width=92)
+            Group By Operator [GBY_8] (rows=10 width=92)
               Output:["_col0","_col1"],aggregations:["max(a)"],keys:b
-              TableScan [TS_0] (rows=19 width=92)
-                
default@tbl_ice_mixed,tbl_ice_mixed,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
+              Select Operator [SEL_7] (rows=19 width=92)
+                Output:["a","b"]
+                TableScan [TS_0] (rows=19 width=92)
+                  
default@tbl_ice_mixed,tbl_ice_mixed,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
 
 PREHOOK: query: select b, max(a) from tbl_ice_mixed group by b
 PREHOOK: type: QUERY
@@ -165,18 +167,20 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 2 vectorized
-      File Output Operator [FS_11]
-        Select Operator [SEL_10] (rows=2 width=373)
+      File Output Operator [FS_12]
+        Select Operator [SEL_11] (rows=2 width=373)
           
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
-          Group By Operator [GBY_9] (rows=2 width=373)
+          Group By Operator [GBY_10] (rows=2 width=373)
             
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0,
 KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7, 
KEY._col8
           <-Map 1 [SIMPLE_EDGE] vectorized
-            SHUFFLE [RS_8]
+            SHUFFLE [RS_9]
               PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6, 
_col7, _col8
-              Group By Operator [GBY_7] (rows=2 width=373)
+              Group By Operator [GBY_8] (rows=2 width=373)
                 
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double,
 t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
-                TableScan [TS_0] (rows=2 width=373)
-                  
default@tbl_ice_mixed_all_types,tbl_ice_mixed_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+                Select Operator [SEL_7] (rows=2 width=373)
+                  
Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+                  TableScan [TS_0] (rows=2 width=373)
+                    
default@tbl_ice_mixed_all_types,tbl_ice_mixed_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
 
 PREHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint, 
t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_mixed_all_types
         group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, 
t_timestamp, t_date, t_decimal
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_orc.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_orc.q.out
index 25764be089..e1d25d5321 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_orc.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_orc.q.out
@@ -46,16 +46,18 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 2 vectorized
-      File Output Operator [FS_10]
-        Group By Operator [GBY_9] (rows=5 width=92)
+      File Output Operator [FS_11]
+        Group By Operator [GBY_10] (rows=5 width=92)
           
Output:["_col0","_col1"],aggregations:["max(VALUE._col0)"],keys:KEY._col0
         <-Map 1 [SIMPLE_EDGE] vectorized
-          SHUFFLE [RS_8]
+          SHUFFLE [RS_9]
             PartitionCols:_col0
-            Group By Operator [GBY_7] (rows=5 width=92)
+            Group By Operator [GBY_8] (rows=5 width=92)
               Output:["_col0","_col1"],aggregations:["max(a)"],keys:b
-              TableScan [TS_0] (rows=10 width=92)
-                
default@tbl_ice_orc,tbl_ice_orc,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
+              Select Operator [SEL_7] (rows=10 width=92)
+                Output:["a","b"]
+                TableScan [TS_0] (rows=10 width=92)
+                  
default@tbl_ice_orc,tbl_ice_orc,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
 
 PREHOOK: query: select b, max(a) from tbl_ice_orc group by b
 PREHOOK: type: QUERY
@@ -128,18 +130,20 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 2 vectorized
-      File Output Operator [FS_11]
-        Select Operator [SEL_10] (rows=1 width=372)
+      File Output Operator [FS_12]
+        Select Operator [SEL_11] (rows=1 width=372)
           
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
-          Group By Operator [GBY_9] (rows=1 width=372)
+          Group By Operator [GBY_10] (rows=1 width=372)
             
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0,
 KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7, 
KEY._col8
           <-Map 1 [SIMPLE_EDGE] vectorized
-            SHUFFLE [RS_8]
+            SHUFFLE [RS_9]
               PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6, 
_col7, _col8
-              Group By Operator [GBY_7] (rows=1 width=372)
+              Group By Operator [GBY_8] (rows=1 width=372)
                 
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double,
 t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
-                TableScan [TS_0] (rows=1 width=372)
-                  
default@tbl_ice_orc_all_types,tbl_ice_orc_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+                Select Operator [SEL_7] (rows=1 width=372)
+                  
Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+                  TableScan [TS_0] (rows=1 width=372)
+                    
default@tbl_ice_orc_all_types,tbl_ice_orc_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
 
 PREHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint, 
t_binary, t_string, t_timestamp, t_date, t_decimal from tbl_ice_orc_all_types
         group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, 
t_timestamp, t_date, t_decimal
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_parquet.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_parquet.q.out
index 201a5efa4a..1a1782d7dd 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_parquet.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_parquet.q.out
@@ -46,16 +46,18 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 2 vectorized
-      File Output Operator [FS_10]
-        Group By Operator [GBY_9] (rows=5 width=92)
+      File Output Operator [FS_11]
+        Group By Operator [GBY_10] (rows=5 width=92)
           
Output:["_col0","_col1"],aggregations:["max(VALUE._col0)"],keys:KEY._col0
         <-Map 1 [SIMPLE_EDGE] vectorized
-          SHUFFLE [RS_8]
+          SHUFFLE [RS_9]
             PartitionCols:_col0
-            Group By Operator [GBY_7] (rows=5 width=92)
+            Group By Operator [GBY_8] (rows=5 width=92)
               Output:["_col0","_col1"],aggregations:["max(a)"],keys:b
-              TableScan [TS_0] (rows=10 width=92)
-                
default@tbl_ice_parquet,tbl_ice_parquet,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
+              Select Operator [SEL_7] (rows=10 width=92)
+                Output:["a","b"]
+                TableScan [TS_0] (rows=10 width=92)
+                  
default@tbl_ice_parquet,tbl_ice_parquet,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
 
 PREHOOK: query: select b, max(a) from tbl_ice_parquet group by b
 PREHOOK: type: QUERY
@@ -128,18 +130,20 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 2 vectorized
-      File Output Operator [FS_11]
-        Select Operator [SEL_10] (rows=1 width=372)
+      File Output Operator [FS_12]
+        Select Operator [SEL_11] (rows=1 width=372)
           
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
-          Group By Operator [GBY_9] (rows=1 width=372)
+          Group By Operator [GBY_10] (rows=1 width=372)
             
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0,
 KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7, 
KEY._col8
           <-Map 1 [SIMPLE_EDGE] vectorized
-            SHUFFLE [RS_8]
+            SHUFFLE [RS_9]
               PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6, 
_col7, _col8
-              Group By Operator [GBY_7] (rows=1 width=372)
+              Group By Operator [GBY_8] (rows=1 width=372)
                 
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double,
 t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal
-                TableScan [TS_0] (rows=1 width=372)
-                  
default@tbl_ice_parquet_all_types,tbl_ice_parquet_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+                Select Operator [SEL_7] (rows=1 width=372)
+                  
Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
+                  TableScan [TS_0] (rows=1 width=372)
+                    
default@tbl_ice_parquet_all_types,tbl_ice_parquet_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"]
 
 PREHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint, 
t_binary, t_string, t_timestamp, t_date, t_decimal from 
tbl_ice_parquet_all_types
         group by t_double, t_boolean, t_int, t_bigint, t_binary, t_string, 
t_timestamp, t_date, t_decimal
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index a70a6356a0..64a9a1becd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -78,8 +78,8 @@ public class FetchTask extends Task<FetchWork> implements 
Serializable {
       if (source instanceof TableScanOperator) {
         TableScanOperator ts = (TableScanOperator) source;
         // push down projections
-        ColumnProjectionUtils.appendReadColumns(
-            job, ts.getNeededColumnIDs(), ts.getNeededColumns(), 
ts.getNeededNestedColumnPaths());
+        ColumnProjectionUtils.appendReadColumns(job, ts.getNeededColumnIDs(), 
ts.getNeededColumns(),
+                ts.getNeededNestedColumnPaths(), 
ts.getConf().hasVirtualCols());
         // push down filters and as of information
         HiveInputFormat.pushFiltersAndAsOf(job, ts, null);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 91cf0c3c03..4204d920f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
+import static 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperation;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -616,6 +617,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       initializeSpecPath();
       fs = specPath.getFileSystem(hconf);
 
+      setWriteOperation(hconf, getConf().getTableInfo().getTableName(), 
getConf().getWriteOperation());
       if (hconf instanceof JobConf) {
         jc = (JobConf) hconf;
       } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index 8c328234ad..1fdc03557e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -370,12 +370,14 @@ public class MapOperator extends AbstractMapOperator {
           Configuration clonedConf = new Configuration(hconf);
           
clonedConf.unset(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
           clonedConf.unset(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+          
clonedConf.unset(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR);
           clonedConf.unset(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
           tableNameToConf.put(tableName, clonedConf);
         }
         Configuration newConf = tableNameToConf.get(tableName);
         ColumnProjectionUtils.appendReadColumns(newConf, 
tableScanDesc.getNeededColumnIDs(),
-            tableScanDesc.getOutputColumnNames(), 
tableScanDesc.getNeededNestedColumnPaths());
+            tableScanDesc.getOutputColumnNames(), 
tableScanDesc.getNeededNestedColumnPaths(),
+            tableScanDesc.hasVirtualCols());
       }
     }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
index e58036a0bb..b3f167c3e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
@@ -202,8 +202,8 @@ public class SMBMapJoinOperator extends 
AbstractMapJoinOperator<SMBJoinDesc> imp
 
       TableScanOperator ts = (TableScanOperator)aliasToWork.get(alias);
       // push down projections
-      ColumnProjectionUtils.appendReadColumns(
-          jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(), 
ts.getNeededNestedColumnPaths());
+      ColumnProjectionUtils.appendReadColumns(jobClone, 
ts.getNeededColumnIDs(), ts.getNeededColumns(),
+              ts.getNeededNestedColumnPaths(), ts.conf.hasVirtualCols());
       // push down filters and as of information
       HiveInputFormat.pushFiltersAndAsOf(jobClone, ts, null);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 0d4289a020..62b74dc842 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -477,8 +477,8 @@ public class MapredLocalTask extends Task<MapredLocalWork> 
implements Serializab
 
       TableScanOperator ts = 
(TableScanOperator)work.getAliasToWork().get(entry.getKey());
       // push down projections
-      ColumnProjectionUtils.appendReadColumns(
-          jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(), 
ts.getNeededNestedColumnPaths());
+      ColumnProjectionUtils.appendReadColumns(jobClone, 
ts.getNeededColumnIDs(), ts.getNeededColumns(),
+              ts.getNeededNestedColumnPaths(), ts.getConf().hasVirtualCols());
       // push down filters and as of information
       HiveInputFormat.pushFiltersAndAsOf(jobClone, ts, null);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 5a3def4d0d..403fb9ed01 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -786,6 +786,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
       get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, ""));;
     StringBuilder readColumnNamesBuffer = new StringBuilder(newjob.
       get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
+    boolean fetchVirtualColumns = 
newjob.getBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, false);
     // for each dir, get the InputFormat, and do getSplits.
     for (Path dir : dirs) {
       PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
@@ -804,8 +805,10 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
           readColumnsBuffer.setLength(0);
           readColumnNamesBuffer.setLength(0);
           // push down projections.
-          ColumnProjectionUtils.appendReadColumns(readColumnsBuffer, 
readColumnNamesBuffer,
+          ColumnProjectionUtils.appendReadColumns(
+                  readColumnsBuffer, readColumnNamesBuffer,
             tableScan.getNeededColumnIDs(), tableScan.getNeededColumns());
+          fetchVirtualColumns = tableScan.getConf().hasVirtualCols();
           pushDownProjection = true;
           // push down filters and as of information
           pushFiltersAndAsOf(newjob, tableScan, this.mrwork);
@@ -829,7 +832,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
 
         // set columns to read in conf
         if (pushDownProjection) {
-          pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer);
+          pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer, 
fetchVirtualColumns);
         }
 
         addSplitsForGroup(currentDirs, currentTableScan, newjob,
@@ -847,7 +850,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
 
     // set columns to read in conf
     if (pushDownProjection) {
-      pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer);
+      pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer, 
fetchVirtualColumns);
     }
 
     if (dirs.length != 0) { // TODO: should this be currentDirs?
@@ -865,15 +868,17 @@ public class HiveInputFormat<K extends 
WritableComparable, V extends Writable>
   }
 
   private void pushProjection(final JobConf newjob, final StringBuilder 
readColumnsBuffer,
-      final StringBuilder readColumnNamesBuffer) {
+      final StringBuilder readColumnNamesBuffer, final boolean 
fetchVirtualColumns) {
     String readColIds = readColumnsBuffer.toString();
     String readColNames = readColumnNamesBuffer.toString();
     newjob.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
     newjob.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIds);
     newjob.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNames);
+    newjob.setBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, 
fetchVirtualColumns);
 
     LOG.info("{} = {}", ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, 
readColIds);
     LOG.info("{} = {}", ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
readColNames);
+    LOG.info("{} = {}", ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, 
fetchVirtualColumns);
   }
 
 
@@ -1042,7 +1047,11 @@ public class HiveInputFormat<K extends 
WritableComparable, V extends Writable>
         TableScanOperator ts = (TableScanOperator) op;
         // push down projections.
         ColumnProjectionUtils.appendReadColumns(
-            jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns(), 
ts.getNeededNestedColumnPaths());
+                jobConf,
+                ts.getNeededColumnIDs(),
+                ts.getNeededColumns(),
+                ts.getNeededNestedColumnPaths(),
+                ts.getConf().hasVirtualCols());
         // push down filters and as of information
         pushFiltersAndAsOf(jobConf, ts, this.mrwork);
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
index 6d525ba269..f138c1e254 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
@@ -164,7 +164,7 @@ public class ProjectionPusher {
 
   private void pushFilters(final JobConf jobConf, RowSchema rowSchema, 
ExprNodeGenericFuncDesc filterExpr) {
     // construct column name list for reference by filter push down
-    Utilities.setColumnNameList(jobConf, rowSchema);
+    Utilities.setColumnNameList(jobConf, rowSchema, true);
 
     // push down filters
     if (filterExpr == null) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 8d75db400b..5dac8c0661 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -371,7 +371,8 @@ public interface HiveStorageHandler extends Configurable {
    * @return the created DP context object, null if DP context / sorting is 
not required
    * @throws SemanticException
    */
-  default DynamicPartitionCtx createDPContext(HiveConf conf, 
org.apache.hadoop.hive.ql.metadata.Table table)
+  default DynamicPartitionCtx createDPContext(
+          HiveConf conf, org.apache.hadoop.hive.ql.metadata.Table table, 
Operation writeOperation)
       throws SemanticException {
     Preconditions.checkState(alwaysUnpartitioned(), "Should only be called for 
table formats where partitioning " +
         "is not handled by Hive but the table format itself. See 
alwaysUnpartitioned() method.");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 618c60baa7..c1dc1feb02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8131,6 +8131,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
                                           RowSchema fsRS, boolean canBeMerged, 
Table dest_tab, Long mmWriteId, boolean isMmCtas,
                                           Integer dest_type, QB qb, boolean 
isDirectInsert, AcidUtils.Operation acidOperation, String moveTaskId) throws 
SemanticException {
     boolean isInsertOverwrite = false;
+    Context.Operation writeOperation = getWriteOperation(dest);
     switch (dest_type) {
     case QBMetaData.DEST_PARTITION:
       //fall through
@@ -8146,7 +8147,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       // Some non-native tables might be partitioned without partition spec 
information being present in the Table object
       HiveStorageHandler storageHandler = dest_tab.getStorageHandler();
       if (storageHandler != null && storageHandler.alwaysUnpartitioned()) {
-        DynamicPartitionCtx nonNativeDpCtx = 
storageHandler.createDPContext(conf, dest_tab);
+        DynamicPartitionCtx nonNativeDpCtx = 
storageHandler.createDPContext(conf, dest_tab, writeOperation);
         if (dpCtx == null && nonNativeDpCtx != null) {
           dpCtx = nonNativeDpCtx;
         }
@@ -8187,6 +8188,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       acidFileSinks.add(fileSinkDesc);
     }
 
+    fileSinkDesc.setWriteOperation(writeOperation);
+
     fileSinkDesc.setTemporary(destTableIsTemporary);
     fileSinkDesc.setMaterialization(destTableIsMaterialization);
 
@@ -11478,20 +11481,19 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
             TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()), alias, 
true));
       }
 
+      // put virtual columns into RowResolver.
       List<VirtualColumn> vcList = new ArrayList<>();
-      boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(tab);
-      boolean isUpdateDelete = this instanceof UpdateDeleteSemanticAnalyzer;
-      // put all virtual columns in RowResolver.
-      if (!tab.isNonNative() || (nonNativeAcid && isUpdateDelete)) {
-        vcList = VirtualColumn.getRegistry(conf);
-        if (nonNativeAcid && isUpdateDelete) {
-          vcList.addAll(tab.getStorageHandler().acidVirtualColumns());
-        }
-        vcList.forEach(vc -> rwsch.put(alias, vc.getName().toLowerCase(), new 
ColumnInfo(vc.getName(),
-            vc.getTypeInfo(), alias, true, vc.getIsHidden()
-        )));
+      if (!tab.isNonNative()) {
+        vcList.addAll(VirtualColumn.getRegistry(conf));
+      }
+      if (tab.isNonNative() && AcidUtils.isNonNativeAcidTable(tab)) {
+        vcList.addAll(tab.getStorageHandler().acidVirtualColumns());
       }
 
+      vcList.forEach(vc -> rwsch.put(alias, vc.getName().toLowerCase(), new 
ColumnInfo(vc.getName(),
+              vc.getTypeInfo(), alias, true, vc.getIsHidden()
+      )));
+
       // Create the root of the operator tree
       TableScanDesc tsDesc = new TableScanDesc(alias, vcList, tab);
       setupStats(tsDesc, qb.getParseInfo(), tab, alias, rwsch);
@@ -15018,6 +15020,12 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
             AcidUtils.Operation.INSERT);
   }
 
+  private Context.Operation getWriteOperation(String destination) {
+    return deleting(destination) ? Context.Operation.DELETE :
+        (updating(destination) ? Context.Operation.UPDATE :
+            Context.Operation.OTHER);
+  }
+
   private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of, 
String dest,
       boolean isMM) {
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 70121096a8..3b215a7acf 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -102,9 +102,6 @@ public class UpdateDeleteSemanticAnalyzer extends 
RewriteSemanticAnalyzer {
     validateTxnManager(mTable);
     validateTargetTable(mTable);
 
-    // save the operation type into the query state
-    SessionStateUtil.addResource(conf, 
Context.Operation.class.getSimpleName(), operation.name());
-
     StringBuilder rewrittenQueryStr = new StringBuilder();
     rewrittenQueryStr.append("insert into table ");
     rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 67ae275363..f2742bdfe5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -27,6 +26,7 @@ import java.util.Set;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
@@ -96,6 +96,7 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
 
   // Record what type of write this is.  Default is non-ACID (ie old style).
   private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
+  private Context.Operation writeOperation = Context.Operation.OTHER;
   private long tableWriteId = 0;  // table write id for this operation
   private int statementId = -1;
   private int maxStmtId = -1;
@@ -582,6 +583,15 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
   public AcidUtils.Operation getWriteType() {
     return writeType;
   }
+
+  public void setWriteOperation(Context.Operation writeOperation) {
+    this.writeOperation = writeOperation;
+  }
+
+  public Context.Operation getWriteOperation() {
+    return writeOperation;
+  }
+
   @Explain(displayName = "Write Type")
   public String getWriteTypeString() {
     return getWriteType() == AcidUtils.Operation.NOT_ACID ? null : 
getWriteType().toString();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
index 0787daa7b7..8be4cfc5b8 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
@@ -20,12 +20,18 @@ package org.apache.hadoop.hive.ql.security.authorization;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 
 public class HiveCustomStorageHandlerUtils {
 
+    public static final String WRITE_OPERATION_CONFIG_PREFIX = 
"file.sink.write.operation.";
+
+
     public static String getTablePropsForCustomStorageHandler(Map<String, 
String> tableProperties) {
         StringBuilder properties = new StringBuilder();
         for (Map.Entry<String,String> serdeMap : tableProperties.entrySet()) {
@@ -48,4 +54,21 @@ public class HiveCustomStorageHandlerUtils {
             .ifPresent(tblProps::putAll);
         return tblProps;
     }
+
+    public static Context.Operation getWriteOperation(Configuration conf, 
String tableName) {
+        if (conf == null || tableName == null) {
+            return null;
+        }
+
+        String operation = conf.get(WRITE_OPERATION_CONFIG_PREFIX + tableName);
+        return operation == null ? null : Context.Operation.valueOf(operation);
+    }
+
+    public static void setWriteOperation(Configuration conf, String tableName, 
Context.Operation operation) {
+        if (conf == null || tableName == null) {
+            return;
+        }
+
+        conf.set(WRITE_OPERATION_CONFIG_PREFIX + tableName, operation.name());
+    }
 }
diff --git 
a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java 
b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
index 70187b6122..5cd776a2e3 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
@@ -52,6 +52,7 @@ public final class ColumnProjectionUtils {
     "hive.io.file.readNestedColumn.paths";
   public static final String READ_ALL_COLUMNS = 
"hive.io.file.read.all.columns";
   public static final String READ_COLUMN_NAMES_CONF_STR = 
"hive.io.file.readcolumn.names";
+  public static final String FETCH_VIRTUAL_COLUMNS_CONF_STR = 
"hive.io.file.fetch.virtual.columns";
   private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = "";
   private static final String READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT = "";
   private static final boolean READ_ALL_COLUMNS_DEFAULT = true;
@@ -161,7 +162,7 @@ public final class ColumnProjectionUtils {
    * @param names Column names.
    */
   public static void appendReadColumns(
-      Configuration conf, List<Integer> ids, List<String> names, List<String> 
groupPaths) {
+      Configuration conf, List<Integer> ids, List<String> names, List<String> 
groupPaths, boolean fetchVirtualCols) {
     if (ids.size() != names.size()) {
       LOG.warn("Read column counts do not match: "
           + ids.size() + " ids, " + names.size() + " names");
@@ -169,6 +170,7 @@ public final class ColumnProjectionUtils {
     appendReadColumns(conf, ids);
     appendReadColumnNames(conf, names);
     appendNestedColumnPaths(conf, groupPaths);
+    conf.setBoolean(FETCH_VIRTUAL_COLUMNS_CONF_STR, fetchVirtualCols);
   }
 
   public static void appendReadColumns(

Reply via email to