This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c7f6043322 HIVE-26136: Implement UPDATE statements for Iceberg tables 
(Peter Vary reviewed by Laszlo Pinter) (#3204)
c7f6043322 is described below

commit c7f60433226b125af5ae281d0fb14a2e937d0787
Author: pvary <[email protected]>
AuthorDate: Mon May 9 07:36:12 2022 +0200

    HIVE-26136: Implement UPDATE statements for Iceberg tables (Peter Vary 
reviewed by Laszlo Pinter) (#3204)
---
 .../org/apache/iceberg/mr/hive/FilesForCommit.java |  13 ++
 .../mr/hive/HiveIcebergOutputCommitter.java        | 128 ++++++++-------
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |  11 +-
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   |  46 +++---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  51 ++++--
 .../apache/iceberg/mr/hive/IcebergAcidUtil.java    |  16 +-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |   8 +-
 .../java/org/apache/iceberg/mr/TestHelper.java     |  23 +++
 .../apache/iceberg/mr/hive/TestHiveIcebergV2.java  | 180 +++++++++++++++++++++
 .../org/apache/iceberg/mr/hive/TestHiveShell.java  |   1 +
 .../org/apache/iceberg/mr/hive/TestTables.java     |   9 ++
 .../positive/update_iceberg_partitioned_avro.q     |  26 +++
 .../positive/update_iceberg_partitioned_orc.q      |  26 +++
 .../positive/update_iceberg_partitioned_parquet.q  |  26 +++
 .../update_iceberg_unpartitioned_parquet.q         |  26 +++
 .../positive/delete_iceberg_partitioned_avro.q.out |   4 +-
 .../positive/delete_iceberg_partitioned_orc.q.out  |   4 +-
 .../delete_iceberg_partitioned_parquet.q.out       |   4 +-
 .../delete_iceberg_unpartitioned_parquet.q.out     |   4 +-
 ...q.out => update_iceberg_partitioned_avro.q.out} |  64 +++++---
 ....q.out => update_iceberg_partitioned_orc.q.out} |  64 +++++---
 ...ut => update_iceberg_partitioned_parquet.q.out} |  64 +++++---
 ... => update_iceberg_unpartitioned_parquet.q.out} |  64 +++++---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |  19 +++
 .../hive/ql/metadata/HiveStorageHandler.java       |  12 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     | 116 ++++++++-----
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java     |  47 ++++--
 27 files changed, 809 insertions(+), 247 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
index 237ef55369..953edfd0d2 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
@@ -27,6 +27,7 @@ import java.util.stream.Stream;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
 public class FilesForCommit implements Serializable {
 
@@ -61,4 +62,16 @@ public class FilesForCommit implements Serializable {
   public Collection<? extends ContentFile> allFiles() {
     return Stream.concat(dataFiles.stream(), 
deleteFiles.stream()).collect(Collectors.toList());
   }
+
+  public boolean isEmpty() {
+    return dataFiles.isEmpty() && deleteFiles.isEmpty();
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("dataFiles", dataFiles.toString())
+        .add("deleteFiles", deleteFiles.toString())
+        .toString();
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 00edc3bd2e..0ea882a450 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -24,7 +24,6 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -33,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -54,7 +54,6 @@ import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.Transaction;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.Util;
@@ -63,6 +62,7 @@ import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
@@ -252,11 +252,12 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
             // list jobLocation to get number of forCommit files
             // we do this because map/reduce num in jobConf is unreliable and 
we have no access to vertex status info
             int numTasks = listForCommits(jobConf, jobLocation).size();
-            Collection<FilesForCommit> results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+            FilesForCommit results = collectResults(numTasks, fileExecutor, 
table.location(), jobContext,
                 table.io(), false);
-            // Check if we have files already committed and remove data files 
if there are any
-            List<? extends ContentFile> files = 
results.stream().map(FilesForCommit::allFiles)
-                .flatMap(Collection::stream).collect(Collectors.toList());
+            // Check if we have files already written and remove data and 
delta files if there are any
+            Collection<ContentFile> files = 
Stream.concat(results.dataFiles().stream(), results.deleteFiles().stream())
+                .collect(Collectors.toList());
+
             if (files.size() > 0) {
               Tasks.foreach(files)
                   .retry(3)
@@ -330,61 +331,71 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
       return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : 
conf.getNumMapTasks();
     });
 
-    Collection<FilesForCommit> writeResults = collectResults(numTasks, 
executor, location, jobContext, io, true);
-    if (writeResults.isEmpty()) {
-      LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete: 
{}, since there were no new files to add",
-          table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
-    } else {
-      if (HiveIcebergStorageHandler.isDelete(conf, name)) {
-        commitDelete(table, Optional.empty(), startTime, writeResults);
+    FilesForCommit writeResults = collectResults(numTasks, executor, location, 
jobContext, io, true);
+    if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+      if (writeResults.isEmpty()) {
+        LOG.info(
+            "Not creating a new commit for table: {}, jobID: {}, isDelete: {}, 
since there were no new files to add",
+            table, jobContext.getJobID(), 
HiveIcebergStorageHandler.isDelete(conf, name));
       } else {
-        boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE, 
false);
-        commitInsert(table, Optional.empty(), startTime, writeResults, 
isOverwrite);
+        commitWrite(table, startTime, writeResults);
       }
+    } else {
+      commitOverwrite(table, startTime, writeResults);
     }
   }
 
-  private void commitDelete(Table table, Optional<Transaction> txn, long 
startTime,
-      Collection<FilesForCommit> results) {
-    RowDelta append = 
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
-    List<DeleteFile> deleteFiles = 
results.stream().map(FilesForCommit::deleteFiles)
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    deleteFiles.forEach(append::addDeletes);
-    append.commit();
-    LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
-        System.currentTimeMillis() - startTime, table, deleteFiles.size());
-    LOG.debug("Added delete files {}", deleteFiles);
+  /**
+   * Creates and commits an Iceberg change with the provided data and delete 
files.
+   * If there are no delete files then an Iceberg 'append' is created, 
otherwise Iceberg 'overwrite' is created.
+   * @param table The table we are changing
+   * @param startTime The start time of the commit - used only for logging
+   * @param results The object containing the new files we would like to add 
to the table
+   */
+  private void commitWrite(Table table, long startTime, FilesForCommit 
results) {
+    if (results.deleteFiles().isEmpty()) {
+      AppendFiles write = table.newAppend();
+      results.dataFiles().forEach(write::appendFile);
+      write.commit();
+    } else {
+      RowDelta write = table.newRowDelta();
+      results.dataFiles().forEach(write::addRows);
+      results.deleteFiles().forEach(write::addDeletes);
+      write.commit();
+    }
+
+    LOG.info("Write commit took {} ms for table: {} with {} data and {} delete 
file(s)",
+        System.currentTimeMillis() - startTime, table, 
results.dataFiles().size(), results.deleteFiles().size());
+    LOG.debug("Added files {}", results);
   }
 
-  private void commitInsert(Table table, Optional<Transaction> txn, long 
startTime,
-      Collection<FilesForCommit> results, boolean isOverwrite) {
-    List<DataFile> dataFiles = results.stream().map(FilesForCommit::dataFiles)
-        .flatMap(Collection::stream).collect(Collectors.toList());
-    if (isOverwrite) {
-      if (!dataFiles.isEmpty()) {
-        ReplacePartitions overwrite = 
txn.map(Transaction::newReplacePartitions).orElse(table.newReplacePartitions());
-        dataFiles.forEach(overwrite::addFile);
-        overwrite.commit();
-        LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
-            table, dataFiles.size());
-      } else if (table.spec().isUnpartitioned()) {
-        DeleteFiles deleteFiles = 
txn.map(Transaction::newDelete).orElse(table.newDelete());
-        deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
-        deleteFiles.commit();
-        LOG.info("Cleared table contents as part of empty overwrite for 
unpartitioned table. " +
-            "Commit took {} ms for table: {}", System.currentTimeMillis() - 
startTime, table);
-      }
-      LOG.debug("Overwrote partitions with files {}", dataFiles);
-    } else if (!dataFiles.isEmpty()) {
-      // Appending data files to the table
-      // We only create a new commit if there's something to append
-      AppendFiles append = 
txn.map(Transaction::newAppend).orElse(table.newAppend());
-      dataFiles.forEach(append::appendFile);
-      append.commit();
-      LOG.info("Append commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime, table,
-          dataFiles.size());
-      LOG.debug("Added files {}", dataFiles);
+  /**
+   * Creates and commits an Iceberg insert overwrite change with the provided 
data files.
+   * For unpartitioned tables the table content is replaced with the new data 
files. If not data files are provided
+   * then the unpartitioned table is truncated.
+   * For partitioned tables the relevant partitions are replaced with the new 
data files. If no data files are provided
+   * then the unpartitioned table remains unchanged.
+   * @param table The table we are changing
+   * @param startTime The start time of the commit - used only for logging
+   * @param results The object containing the new files
+   */
+  private void commitOverwrite(Table table, long startTime, FilesForCommit 
results) {
+    Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not 
handle deletes with overwrite");
+    if (!results.dataFiles().isEmpty()) {
+      ReplacePartitions overwrite = table.newReplacePartitions();
+      results.dataFiles().forEach(overwrite::addFile);
+      overwrite.commit();
+      LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime,
+          table, results.dataFiles().size());
+    } else if (table.spec().isUnpartitioned()) {
+      DeleteFiles deleteFiles = table.newDelete();
+      deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
+      deleteFiles.commit();
+      LOG.info("Cleared table contents as part of empty overwrite for 
unpartitioned table. " +
+          "Commit took {} ms for table: {}", System.currentTimeMillis() - 
startTime, table);
     }
+
+    LOG.debug("Overwrote partitions with files {}", results);
   }
 
   /**
@@ -464,22 +475,25 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
    * @param throwOnFailure If <code>true</code> then it throws an exception on 
failure
    * @return The list of the write results, which include the committed data 
or delete files
    */
-  private static Collection<FilesForCommit> collectResults(int numTasks, 
ExecutorService executor, String location,
+  private static FilesForCommit collectResults(int numTasks, ExecutorService 
executor, String location,
       JobContext jobContext, FileIO io, boolean throwOnFailure) {
     JobConf conf = jobContext.getJobConf();
     // Reading the committed files. The assumption here is that the taskIds 
are generated in sequential order
     // starting from 0.
-    Collection<FilesForCommit> writeResults = new ConcurrentLinkedQueue<>();
+    Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
+    Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
     Tasks.range(numTasks)
         .throwFailureWhenFinished(throwOnFailure)
         .executeWith(executor)
         .retry(3)
         .run(taskId -> {
           String taskFileName = generateFileForCommitLocation(location, conf, 
jobContext.getJobID(), taskId);
-          writeResults.add(readFileForCommit(taskFileName, io));
+          FilesForCommit files = readFileForCommit(taskFileName, io);
+          dataFiles.addAll(files.dataFiles());
+          deleteFiles.addAll(files.deleteFiles());
         });
 
-    return writeResults;
+    return new FilesForCommit(dataFiles, deleteFiles);
   }
 
   /**
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index ea3b29b439..552bffa236 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -81,12 +81,19 @@ public class HiveIcebergOutputFormat<T> implements 
OutputFormat<NullWritable, Co
         .operationId(operationId)
         .build();
     String tableName = jc.get(Catalogs.NAME);
-    HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
-        null, null, null, schema);
     if (HiveIcebergStorageHandler.isDelete(jc, tableName)) {
+      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
+          null, null, null, schema);
       return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat, 
writerFactory, outputFileFactory, io,
           targetFileSize, taskAttemptID, tableName);
+    } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) {
+      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
+          null, null, null, null);
+      return new HiveIcebergUpdateWriter(schema, table.specs(), 
table.spec().specId(), fileFormat, writerFactory,
+          outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc);
     } else {
+      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
+          null, null, null, schema);
       return new HiveIcebergRecordWriter(schema, table.specs(), 
table.spec().specId(), fileFormat, writerFactory,
           outputFileFactory, io, targetFileSize, taskAttemptID, tableName, 
false);
     }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index eecf1518cd..a4470058c7 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -128,40 +128,48 @@ public class HiveIcebergSerDe extends AbstractSerDe {
       }
     }
 
-    String tableName = serDeProperties.getProperty(Catalogs.NAME);
+    this.projectedSchema = projectedSchema(configuration, 
serDeProperties.getProperty(Catalogs.NAME), tableSchema);
+
+    // Currently ClusteredWriter is used which requires that records are 
ordered by partition keys.
+    // Here we ensure that SortedDynPartitionOptimizer will kick in and do the 
sorting.
+    // TODO: remove once we have both Fanout and ClusteredWriter available: 
HIVE-25948
+    HiveConf.setIntVar(configuration, 
HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD, 1);
+    HiveConf.setVar(configuration, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, 
"nonstrict");
+
+    try {
+      this.inspector = IcebergObjectInspector.create(projectedSchema);
+    } catch (Exception e) {
+      throw new SerDeException(e);
+    }
+  }
+
+  private static Schema projectedSchema(Configuration configuration, String 
tableName, Schema tableSchema) {
     if (HiveIcebergStorageHandler.isDelete(configuration, tableName)) {
       // when writing delete files, we should use the full delete schema
-      projectedSchema = 
IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
+      return IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
+    } else if (HiveIcebergStorageHandler.isUpdate(configuration, tableName)) {
+      // when writing delete files, we should use the full delete schema
+      return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns());
     } else if (HiveIcebergStorageHandler.isWrite(configuration, tableName)) {
-      // when writing out data, we should not do projection pushdown
-      projectedSchema = tableSchema;
+      // when writing out data, we should not do projection push down
+      return tableSchema;
     } else {
       configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
       String[] selectedColumns = 
ColumnProjectionUtils.getReadColumnNames(configuration);
       // When same table is joined multiple times, it is possible some 
selected columns are duplicated,
       // in this case wrong recordStructField position leads wrong value or 
ArrayIndexOutOfBoundException
       String[] distinctSelectedColumns = 
Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
-      projectedSchema = distinctSelectedColumns.length > 0 ?
-              tableSchema.caseInsensitiveSelect(distinctSelectedColumns) : 
tableSchema;
+      Schema projectedSchema = distinctSelectedColumns.length > 0 ?
+          tableSchema.caseInsensitiveSelect(distinctSelectedColumns) : 
tableSchema;
       // the input split mapper handles does not belong to this table
       // it is necessary to ensure projectedSchema equals to tableSchema,
       // or we cannot find selectOperator's column from inspector
       if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
-        projectedSchema = tableSchema;
+        return tableSchema;
+      } else {
+        return projectedSchema;
       }
     }
-
-    // Currently ClusteredWriter is used which requires that records are 
ordered by partition keys.
-    // Here we ensure that SortedDynPartitionOptimizer will kick in and do the 
sorting.
-    // TODO: remove once we have both Fanout and ClusteredWriter available: 
HIVE-25948
-    HiveConf.setIntVar(configuration, 
HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD, 1);
-    HiveConf.setVar(configuration, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, 
"nonstrict");
-
-    try {
-      this.inspector = IcebergObjectInspector.create(projectedSchema);
-    } catch (Exception e) {
-      throw new SerDeException(e);
-    }
   }
 
   private void createTableForCTAS(Configuration configuration, Properties 
serDeProperties) {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 6fdddb9b34..39042691e8 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Context.Operation;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -355,7 +355,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       throws SemanticException {
     // delete records are already clustered by partition spec id and the hash 
of the partition struct
     // there is no need to do any additional sorting based on partition columns
-    if (getOperationType().equals(Context.Operation.DELETE.name())) {
+    if (getOperationType().equals(Operation.DELETE.name())) {
       return null;
     }
 
@@ -380,12 +380,13 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       fieldOrderMap.put(fields.get(i).name(), i);
     }
 
+    int offset = acidSelectColumns(hmsTable, 
Operation.valueOf(getOperationType())).size();
     for (PartitionTransformSpec spec : partitionTransformSpecs) {
       int order = fieldOrderMap.get(spec.getColumnName());
       if 
(PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
-        customSortExprs.add(BUCKET_SORT_EXPR.apply(order, 
spec.getTransformParam().get()));
+        customSortExprs.add(BUCKET_SORT_EXPR.apply(order + offset, 
spec.getTransformParam().get()));
       } else {
-        customSortExprs.add(cols -> cols.get(order).clone());
+        customSortExprs.add(cols -> cols.get(order + offset).clone());
       }
     }
 
@@ -490,17 +491,30 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   }
 
   @Override
-  public List<FieldSchema> 
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
-    // TODO: make it configurable whether we want to include the table columns 
in the select query
-    // it might make delete writes faster if we don't have to write out the 
row object
-    return Stream.of(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols())
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
+  public List<FieldSchema> 
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation 
operation) {
+    switch (operation) {
+      case DELETE:
+      case UPDATE:
+        // TODO: make it configurable whether we want to include the table 
columns in the select query.
+        // It might make delete writes faster if we don't have to write out 
the row object
+        return Stream.of(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols())
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+      default:
+        return ImmutableList.of();
+    }
   }
 
   @Override
-  public List<FieldSchema> 
acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
-    return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
+  public List<FieldSchema> 
acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation 
operation) {
+    switch (operation) {
+      case DELETE:
+        return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
+      default:
+        // For update operations we use the same sort order defined by
+        // {@link #createDPContext(HiveConf, 
org.apache.hadoop.hive.ql.metadata.Table)}
+        return ImmutableList.of();
+    }
   }
 
   private void setCommonJobConf(JobConf jobConf) {
@@ -543,12 +557,17 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   }
 
   public static boolean isWrite(Configuration conf, String tableName) {
-    return conf != null && tableName != null && 
Context.Operation.OTHER.name().equals(
+    return conf != null && tableName != null && Operation.OTHER.name().equals(
         conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
   }
 
   public static boolean isDelete(Configuration conf, String tableName) {
-    return conf != null && tableName != null && 
Context.Operation.DELETE.name().equals(
+    return conf != null && tableName != null && Operation.DELETE.name().equals(
+        conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
+  }
+
+  public static boolean isUpdate(Configuration conf, String tableName) {
+    return conf != null && tableName != null && Operation.UPDATE.name().equals(
         conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
   }
 
@@ -788,8 +807,8 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   }
 
   private String getOperationType() {
-    return SessionStateUtil.getProperty(conf, 
Context.Operation.class.getSimpleName())
-        .orElse(Context.Operation.OTHER.name());
+    return SessionStateUtil.getProperty(conf, Operation.class.getSimpleName())
+        .orElse(Operation.OTHER.name());
   }
 
   private static class NonSerializingConfig implements Serializable {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
index faa2ca3ec1..73fe9743e7 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
@@ -115,20 +115,20 @@ public class IcebergAcidUtil {
    * @return The schema for reading files, extended with metadata columns 
needed for deletes
    */
   public static Schema createFileReadSchemaForUpdate(List<Types.NestedField> 
dataCols, Table table) {
-    List<Types.NestedField> cols = 
Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
-    SERDE_META_COLS.forEach((metaCol, index) -> {
+    List<Types.NestedField> cols = 
Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());
+    FILE_READ_META_COLS.forEach((metaCol, index) -> {
       if (metaCol == PARTITION_STRUCT_META_COL) {
         cols.add(MetadataColumns.metadataColumn(table, 
MetadataColumns.PARTITION_COLUMN_NAME));
       } else {
         cols.add(metaCol);
       }
     });
-    // New column values
-    cols.addAll(dataCols);
     // Old column values
     cols.addAll(dataCols.stream()
         .map(f -> Types.NestedField.optional(1147483545 + f.fieldId(), 
"__old_value_for" + f.name(), f.type()))
         .collect(Collectors.toList()));
+    // New column values
+    cols.addAll(dataCols);
     return new Schema(cols);
   }
 
@@ -139,12 +139,12 @@ public class IcebergAcidUtil {
   public static Schema createSerdeSchemaForUpdate(List<Types.NestedField> 
dataCols) {
     List<Types.NestedField> cols = 
Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
     SERDE_META_COLS.forEach((metaCol, index) -> cols.add(metaCol));
-    // New column values
-    cols.addAll(dataCols);
     // Old column values
     cols.addAll(dataCols.stream()
         .map(f -> Types.NestedField.optional(1147483545 + f.fieldId(), 
"__old_value_for_" + f.name(), f.type()))
         .collect(Collectors.toList()));
+    // New column values
+    cols.addAll(dataCols);
     return new Schema(cols);
   }
 
@@ -154,7 +154,7 @@ public class IcebergAcidUtil {
    * @param original The record object to populate. The end result is the 
original record before the update.
    */
   public static void populateWithOriginalValues(Record rec, Record original) {
-    int dataOffset = SERDE_META_COLS.size() + original.size();
+    int dataOffset = SERDE_META_COLS.size();
     for (int i = dataOffset; i < dataOffset + original.size(); ++i) {
       original.set(i - dataOffset, rec.get(i));
     }
@@ -166,7 +166,7 @@ public class IcebergAcidUtil {
    * @param newRecord The record object to populate. The end result is the new 
record after the update.
    */
   public static void populateWithNewValues(Record rec, Record newRecord) {
-    int dataOffset = SERDE_META_COLS.size();
+    int dataOffset = SERDE_META_COLS.size() + newRecord.size();
     for (int i = dataOffset; i < dataOffset + newRecord.size(); ++i) {
       newRecord.set(i - dataOffset, rec.get(i));
     }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 085491ffa8..82636ff95a 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -264,7 +264,8 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       while (true) {
         if (currentIterator.hasNext()) {
           current = currentIterator.next();
-          if (HiveIcebergStorageHandler.isDelete(conf, 
conf.get(Catalogs.NAME))) {
+          if (HiveIcebergStorageHandler.isDelete(conf, 
conf.get(Catalogs.NAME)) ||
+              HiveIcebergStorageHandler.isUpdate(conf, 
conf.get(Catalogs.NAME))) {
             GenericRecord rec = (GenericRecord) current;
             PositionDeleteInfo.setIntoConf(conf,
                 IcebergAcidUtil.parseSpecId(rec),
@@ -512,6 +513,11 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
         readSchema = 
IcebergAcidUtil.createFileReadSchemaForDelete(readSchema.columns(), table);
       }
 
+      // for UPDATE queries, add additional metadata columns into the read 
schema
+      if (HiveIcebergStorageHandler.isUpdate(conf, conf.get(Catalogs.NAME))) {
+        readSchema = 
IcebergAcidUtil.createFileReadSchemaForUpdate(readSchema.columns(), table);
+      }
+
       return readSchema;
     }
 
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
index 25c04aebfd..df61cfd8c3 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -151,6 +151,29 @@ public class TestHelper {
     return appender().writeFile(partition, records);
   }
 
+  public Map<DataFile, List<Record>> writeFiles(List<Record> rowSet) throws 
IOException {
+    // The rows collected by partitions
+    Map<PartitionKey, List<Record>> rows = Maps.newHashMap();
+    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+    for (Record record : rowSet) {
+      partitionKey.partition(record);
+      List<Record> partitionRows = rows.get(partitionKey);
+      if (partitionRows == null) {
+        partitionRows = rows.put(partitionKey.copy(), Lists.newArrayList());
+      }
+
+      partitionRows.add(record);
+    }
+
+    // Write out the partitions one-by-one
+    Map<DataFile, List<Record>> dataFiles = 
Maps.newHashMapWithExpectedSize(rows.size());
+    for (PartitionKey partition : rows.keySet()) {
+      dataFiles.put(writeFile(partition, rows.get(partition)), 
rows.get(partition));
+    }
+
+    return dataFiles;
+  }
+
   private GenericAppenderHelper appender() {
     return new GenericAppenderHelper(table, fileFormat, tmp, conf);
   }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
index 1c9d3e1922..016fea5a09 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
@@ -383,6 +383,186 @@ public class TestHiveIcebergV2 extends 
HiveIcebergStorageHandlerWithEngineBase {
     }
   }
 
+  @Test
+  public void testUpdateStatementUnpartitioned() {
+    // create and insert an initial batch of records
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    // insert one more batch so that we have multiple data files within the 
same partition
+    
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
+        TableIdentifier.of("default", "customers"), false));
+
+    shell.executeStatement("UPDATE customers SET last_name='Changed' WHERE 
customer_id=3 or first_name='Joanna'");
+
+    List<Object[]> objects =
+        shell.executeStatement("SELECT * FROM customers ORDER BY customer_id, 
last_name, first_name");
+    Assert.assertEquals(12, objects.size());
+    List<Record> expected = 
TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(1L, "Joanna", "Changed")
+        .add(1L, "Sharon", "Taylor")
+        .add(2L, "Joanna", "Changed")
+        .add(2L, "Jake", "Donnel")
+        .add(2L, "Susan", "Morrison")
+        .add(2L, "Bob", "Silver")
+        .add(3L, "Blake", "Changed")
+        .add(3L, "Marci", "Changed")
+        .add(3L, "Trudy", "Changed")
+        .add(3L, "Trudy", "Changed")
+        .add(4L, "Laci", "Zold")
+        .add(5L, "Peti", "Rozsaszin")
+        .build();
+    HiveIcebergTestUtils.validateData(expected,
+        
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
 objects), 0);
+  }
+
+  @Test
+  public void testUpdateStatementPartitioned() {
+    PartitionSpec spec = 
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .identity("last_name").bucket("customer_id", 16).build();
+
+    // create and insert an initial batch of records
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        spec, fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    // insert one more batch so that we have multiple data files within the 
same partition
+    
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
+        TableIdentifier.of("default", "customers"), false));
+
+    shell.executeStatement("UPDATE customers SET last_name='Changed' WHERE 
customer_id=3 or first_name='Joanna'");
+
+    List<Object[]> objects =
+        shell.executeStatement("SELECT * FROM customers ORDER BY customer_id, 
last_name, first_name");
+    Assert.assertEquals(12, objects.size());
+    List<Record> expected = 
TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(1L, "Joanna", "Changed")
+        .add(1L, "Sharon", "Taylor")
+        .add(2L, "Joanna", "Changed")
+        .add(2L, "Jake", "Donnel")
+        .add(2L, "Susan", "Morrison")
+        .add(2L, "Bob", "Silver")
+        .add(3L, "Blake", "Changed")
+        .add(3L, "Marci", "Changed")
+        .add(3L, "Trudy", "Changed")
+        .add(3L, "Trudy", "Changed")
+        .add(4L, "Laci", "Zold")
+        .add(5L, "Peti", "Rozsaszin")
+        .build();
+    HiveIcebergTestUtils.validateData(expected,
+        
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
 objects), 0);
+  }
+
+  @Test
+  public void testUpdateStatementWithOtherTable() {
+    PartitionSpec spec = 
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .identity("last_name").bucket("customer_id", 16).build();
+
+    // create a couple of tables, with an initial batch of records
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        spec, fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    testTables.createTable(shell, "other", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        spec, fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1, 2);
+
+    shell.executeStatement("UPDATE customers SET last_name='Changed' WHERE 
customer_id in " +
+        "(select t1.customer_id from customers t1 join other t2 on 
t1.customer_id = t2.customer_id) or " +
+        "first_name in (select first_name from customers where first_name = 
'Bob')");
+
+    List<Object[]> objects =
+        shell.executeStatement("SELECT * FROM customers ORDER BY customer_id, 
last_name, last_name");
+    Assert.assertEquals(9, objects.size());
+    List<Record> expected = 
TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(1L, "Joanna", "Pierce")
+        .add(1L, "Sharon", "Taylor")
+        .add(2L, "Bob", "Changed")
+        .add(2L, "Jake", "Donnel")
+        .add(2L, "Susan", "Morrison")
+        .add(2L, "Joanna", "Silver")
+        .add(3L, "Blake", "Changed")
+        .add(3L, "Trudy", "Changed")
+        .add(3L, "Trudy", "Changed")
+        .build();
+    HiveIcebergTestUtils.validateData(expected,
+        
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
 objects), 0);
+  }
+
+  @Test
+  public void testUpdateStatementWithPartitionAndSchemaEvolution() {
+    PartitionSpec spec = 
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .identity("last_name").bucket("customer_id", 16).build();
+
+    // create and insert an initial batch of records
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        spec, fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    // insert one more batch so that we have multiple data files within the 
same partition
+    
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
+        TableIdentifier.of("default", "customers"), false));
+
+    // change the partition spec + schema, and insert some new records
+    shell.executeStatement("ALTER TABLE customers SET PARTITION SPEC 
(bucket(64, last_name))");
+    shell.executeStatement("ALTER TABLE customers ADD COLUMNS (department 
string)");
+    shell.executeStatement("ALTER TABLE customers CHANGE COLUMN first_name 
given_name string first");
+    shell.executeStatement("INSERT INTO customers VALUES ('Natalie', 20, 
'Bloom', 'Finance'), ('Joanna', 22, " +
+        "'Huberman', 'Operations')");
+
+    // update should handle changing records both from older specs and from 
the new spec without problems
+    // there are records with Joanna in both the old and new spec, as well as 
the old and new schema
+    shell.executeStatement("UPDATE customers set last_name='Changed' WHERE 
customer_id=3 or given_name='Joanna'");
+
+    List<Object[]> objects =
+        shell.executeStatement("SELECT * FROM customers ORDER BY customer_id, 
last_name, given_name");
+    Assert.assertEquals(14, objects.size());
+
+    Schema newSchema = new Schema(
+        optional(2, "given_name", Types.StringType.get()),
+        optional(1, "customer_id", Types.LongType.get()),
+        optional(3, "last_name", Types.StringType.get(), "This is last name"),
+        optional(4, "department", Types.StringType.get())
+    );
+    List<Record> expected = TestHelper.RecordsBuilder.newInstance(newSchema)
+        .add("Joanna", 1L, "Changed", null)
+        .add("Sharon", 1L, "Taylor", null)
+        .add("Joanna", 2L, "Changed", null)
+        .add("Jake", 2L, "Donnel", null)
+        .add("Susan", 2L, "Morrison", null)
+        .add("Bob", 2L, "Silver", null)
+        .add("Blake", 3L, "Changed", null)
+        .add("Marci", 3L, "Changed", null)
+        .add("Trudy", 3L, "Changed", null)
+        .add("Trudy", 3L, "Changed", null)
+        .add("Laci", 4L, "Zold", null)
+        .add("Peti", 5L, "Rozsaszin", null)
+        .add("Natalie", 20L, "Bloom", "Finance")
+        .add("Joanna", 22L, "Changed", "Operations")
+        .build();
+    HiveIcebergTestUtils.validateData(expected, 
HiveIcebergTestUtils.valueForRow(newSchema, objects), 0);
+  }
+
+  @Test
+  public void testUpdateForSupportedTypes() throws IOException {
+    for (int i = 0; i < SUPPORTED_TYPES.size(); i++) {
+      Type type = SUPPORTED_TYPES.get(i);
+
+      // TODO: remove this filter when issue #1881 is resolved
+      if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
+        continue;
+      }
+
+      // TODO: remove this filter when we figure out how we could test binary 
types
+      if (type == Types.BinaryType.get() || 
type.equals(Types.FixedType.ofLength(5))) {
+        continue;
+      }
+
+      String tableName = type.typeId().toString().toLowerCase() + "_table_" + 
i;
+      String columnName = type.typeId().toString().toLowerCase() + "_column";
+
+      Schema schema = new Schema(required(1, columnName, type));
+      List<Record> originalRecords = TestHelper.generateRandomRecords(schema, 
1, 0L);
+      Table table = testTables.createTable(shell, tableName, schema, 
fileFormat, originalRecords, 2);
+
+      List<Record> newRecords = TestHelper.generateRandomRecords(schema, 1, 
3L);
+      shell.executeStatement(testTables.getUpdateQuery(tableName, 
newRecords.get(0)));
+      HiveIcebergTestUtils.validateData(table, newRecords, 0);
+    }
+  }
+
   private static <T> PositionDelete<T> positionDelete(CharSequence path, long 
pos, T row) {
     PositionDelete<T> positionDelete = PositionDelete.create();
     return positionDelete.set(path, pos, row);
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
index 7f43dc28e7..d05d0bf250 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
@@ -205,6 +205,7 @@ public class TestHiveShell {
     // Disable vectorization for HiveIcebergInputFormat
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
 
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
     hiveConf.set(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname, 
DbTxnManager.class.getName());
     hiveConf.set(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER.varname, 
SQLStdHiveAuthorizerFactory.class.getName());
 
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 4bf7f88652..91c70dde62 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -316,6 +316,15 @@ abstract class TestTables {
     return query.toString();
   }
 
+  public String getUpdateQuery(String tableName, Record record) {
+    StringBuilder query = new StringBuilder("UPDATE 
").append(tableName).append(" SET ");
+
+    query.append(record.struct().fields().stream()
+        .map(field -> field.name() + "=" + 
getStringValueForInsert(record.getField(field.name()), field.type()))
+        .collect(Collectors.joining(",")));
+    return query.toString();
+  }
+
   /**
    * Creates a Hive test table. Creates the Iceberg table/data and creates the 
corresponding Hive table as well when
    * needed. The table will be in the 'default' database. The table will be 
populated with the provided with randomly
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_avro.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_avro.q
new file mode 100644
index 0000000000..648b422527
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_avro.q
@@ -0,0 +1,26 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string, c int) partitioned by spec 
(bucket(16, a), truncate(3, b)) stored by iceberg stored as avro tblproperties 
('format-version'='2');
+
+-- update using simple predicates
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), 
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
+update tbl_ice set b='Changes' where b in ('one', 'four') or a = 22;
+select * from tbl_ice order by a, b, c;
+
+-- update using subqueries referencing the same table
+insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
+update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & another iceberg table
+drop table if exists tbl_ice_other;
+create external table tbl_ice_other(a int, b string) stored by iceberg;
+insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
+update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & a non-iceberg table
+drop table if exists tbl_standard_other;
+create external table tbl_standard_other(a int, b string) stored as orc;
+insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
+update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_orc.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_orc.q
new file mode 100644
index 0000000000..a8ddb7fc3e
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_orc.q
@@ -0,0 +1,26 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string, c int) partitioned by spec 
(bucket(16, a), truncate(3, b)) stored by iceberg stored as orc tblproperties 
('format-version'='2');
+
+-- update using simple predicates
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), 
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
+update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22;
+select * from tbl_ice order by a, b, c;
+
+-- update using subqueries referencing the same table
+insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
+update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & another table
+drop table if exists tbl_ice_other;
+create external table tbl_ice_other(a int, b string) stored by iceberg;
+insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
+update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & a non-iceberg table
+drop table if exists tbl_standard_other;
+create external table tbl_standard_other(a int, b string) stored as orc;
+insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
+update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_parquet.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_parquet.q
new file mode 100644
index 0000000000..c207f956c5
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_parquet.q
@@ -0,0 +1,26 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string, c int) partitioned by spec 
(bucket(16, a), truncate(3, b)) stored by iceberg tblproperties 
('format-version'='2');
+
+-- update using simple predicates
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), 
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
+update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22;
+select * from tbl_ice order by a, b, c;
+
+-- update using subqueries referencing the same table
+insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
+update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & another table
+drop table if exists tbl_ice_other;
+create external table tbl_ice_other(a int, b string) stored by iceberg;
+insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
+update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & a non-iceberg table
+drop table if exists tbl_standard_other;
+create external table tbl_standard_other(a int, b string) stored as orc;
+insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
+update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_unpartitioned_parquet.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_unpartitioned_parquet.q
new file mode 100644
index 0000000000..ecc7ce02e6
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_unpartitioned_parquet.q
@@ -0,0 +1,26 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string, c int) stored by iceberg 
tblproperties ('format-version'='2');
+
+-- update using simple predicates
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), 
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
+update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22;
+select * from tbl_ice order by a, b, c;
+
+-- update using subqueries referencing the same table
+insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
+update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & another table
+drop table if exists tbl_ice_other;
+create external table tbl_ice_other(a int, b string) stored by iceberg;
+insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
+update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & a non-iceberg table
+drop table if exists tbl_standard_other;
+create external table tbl_standard_other(a int, b string) stored as orc;
+insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
+update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
index 2a6ca684ed..fb85bad5f1 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
@@ -46,8 +46,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 
800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
index b31e6322a2..f8641e4b09 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
@@ -46,8 +46,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 
800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
index 5535baa16a..e5199c19c6 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
@@ -46,8 +46,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 
800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
index e0355ce680..02d06c9097 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
@@ -46,8 +46,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 
800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
similarity index 72%
copy from 
iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
copy to 
iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
index 2a6ca684ed..351011cb47 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
@@ -18,25 +18,28 @@ POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), 
(2, 'two', 51), (3,
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+PREHOOK: query: update tbl_ice set b='Changes' where b in ('one', 'four') or a 
= 22
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+POSTHOOK: query: update tbl_ice set b='Changes' where b in ('one', 'four') or 
a = 22
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changes 50
 2      two     51
 3      three   52
+4      Changes 53
 5      five    54
+111    Changes 55
 333    two     56
 PREHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 
801)
 PREHOOK: type: QUERY
@@ -46,26 +49,33 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 
800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
-PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from 
tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
+POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a 
from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changes 55
 333    two     56
 444    hola    800
+555    Changed again   801
 PREHOOK: query: drop table if exists tbl_ice_other
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_ice_other
@@ -86,25 +96,33 @@ POSTHOOK: query: insert into tbl_ice_other values (10, 
'ten'), (333, 'hundred')
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice_other
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a 
from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_ice_other
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select 
t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_ice_other
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changes 55
+333    Changed forever 56
 444    hola    800
+555    Changed again   801
 PREHOOK: query: drop table if exists tbl_standard_other
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_standard_other
@@ -127,22 +145,30 @@ POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_standard_other
 POSTHOOK: Lineage: tbl_standard_other.a SCRIPT []
 POSTHOOK: Lineage: tbl_standard_other.b SCRIPT []
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a 
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_standard_other
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a 
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_standard_other
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select count(*) from tbl_ice
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select count(*) from tbl_ice
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changes 55
+333    Changed forever 56
+444    The last one    800
+555    Changed again   801
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
similarity index 72%
copy from 
iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
copy to 
iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
index b31e6322a2..4fbe5c0500 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
@@ -18,25 +18,28 @@ POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), 
(2, 'two', 51), (3,
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a 
= 22
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or 
a = 22
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed 50
 2      two     51
 3      three   52
+4      Changed 53
 5      five    54
+111    Changed 55
 333    two     56
 PREHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 
801)
 PREHOOK: type: QUERY
@@ -46,26 +49,33 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 
800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
-PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from 
tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
+POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a 
from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
 333    two     56
 444    hola    800
+555    Changed again   801
 PREHOOK: query: drop table if exists tbl_ice_other
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_ice_other
@@ -86,25 +96,33 @@ POSTHOOK: query: insert into tbl_ice_other values (10, 
'ten'), (333, 'hundred')
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice_other
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a 
from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_ice_other
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select 
t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_ice_other
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
+333    Changed forever 56
 444    hola    800
+555    Changed again   801
 PREHOOK: query: drop table if exists tbl_standard_other
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_standard_other
@@ -127,22 +145,30 @@ POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_standard_other
 POSTHOOK: Lineage: tbl_standard_other.a SCRIPT []
 POSTHOOK: Lineage: tbl_standard_other.b SCRIPT []
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a 
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_standard_other
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a 
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_standard_other
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select count(*) from tbl_ice
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select count(*) from tbl_ice
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
+333    Changed forever 56
+444    The last one    800
+555    Changed again   801
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
similarity index 71%
copy from 
iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
copy to 
iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
index 5535baa16a..46913856ed 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
@@ -18,25 +18,28 @@ POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), 
(2, 'two', 51), (3,
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a 
= 22
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or 
a = 22
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed 50
 2      two     51
 3      three   52
+4      Changed 53
 5      five    54
+111    Changed 55
 333    two     56
 PREHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 
801)
 PREHOOK: type: QUERY
@@ -46,26 +49,33 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 
800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
-PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from 
tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
+POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a 
from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
 333    two     56
 444    hola    800
+555    Changed again   801
 PREHOOK: query: drop table if exists tbl_ice_other
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_ice_other
@@ -86,25 +96,33 @@ POSTHOOK: query: insert into tbl_ice_other values (10, 
'ten'), (333, 'hundred')
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice_other
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a 
from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_ice_other
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select 
t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_ice_other
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
+333    Changed forever 56
 444    hola    800
+555    Changed again   801
 PREHOOK: query: drop table if exists tbl_standard_other
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_standard_other
@@ -127,22 +145,30 @@ POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_standard_other
 POSTHOOK: Lineage: tbl_standard_other.a SCRIPT []
 POSTHOOK: Lineage: tbl_standard_other.b SCRIPT []
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a 
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_standard_other
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a 
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_standard_other
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select count(*) from tbl_ice
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select count(*) from tbl_ice
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
+333    Changed forever 56
+444    The last one    800
+555    Changed again   801
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
similarity index 71%
copy from 
iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
copy to 
iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
index e0355ce680..1b0d9d5bcd 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
@@ -18,25 +18,28 @@ POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), 
(2, 'two', 51), (3,
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a 
= 22
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or 
a = 22
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed 50
 2      two     51
 3      three   52
+4      Changed 53
 5      five    54
+111    Changed 55
 333    two     56
 PREHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 
801)
 PREHOOK: type: QUERY
@@ -46,26 +49,33 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 
800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
-PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
+Warning: Shuffle Join MERGEJOIN[51][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1, $hdt$_2, 
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from 
tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a 
<= 5) or c in (select c from tbl_ice where c > 800)
+POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a 
from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
 333    two     56
 444    hola    800
+555    Changed again   801
 PREHOOK: query: drop table if exists tbl_ice_other
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_ice_other
@@ -86,25 +96,33 @@ POSTHOOK: query: insert into tbl_ice_other values (10, 
'ten'), (333, 'hundred')
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice_other
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a 
from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_ice_other
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_ice_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select 
t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_ice_other
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
+333    Changed forever 56
 444    hola    800
+555    Changed again   801
 PREHOOK: query: drop table if exists tbl_standard_other
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_standard_other
@@ -127,22 +145,30 @@ POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_standard_other
 POSTHOOK: Lineage: tbl_standard_other.a SCRIPT []
 POSTHOOK: Lineage: tbl_standard_other.b SCRIPT []
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a 
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_standard_other
 PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1 
join tbl_standard_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a 
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_standard_other
 POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select count(*) from tbl_ice
+PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select count(*) from tbl_ice
+POSTHOOK: query: select * from tbl_ice order by a, b, c
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0
+1      Changed again   50
+2      Changed again   51
+3      Changed again   52
+4      Changed again   53
+5      Changed again   54
+111    Changed 55
+333    Changed forever 56
+444    The last one    800
+555    Changed again   801
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 3a83855016..70fa42bad6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -50,6 +50,7 @@ import com.google.common.base.Strings;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -98,6 +99,7 @@ import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.HiveParser;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
@@ -3330,6 +3332,23 @@ public class AcidUtils {
         table.getStorageHandler().supportsAcidOperations() != 
HiveStorageHandler.AcidSupportType.NONE;
   }
 
+  /**
+   * Returns the virtual columns needed for update queries. For ACID queries 
it is a single ROW__ID, for non-native
+   * tables the list is provided by the {@link 
HiveStorageHandler#acidVirtualColumns()}.
+   * @param table The table for which we run the query
+   * @return The list of virtual columns used
+   */
+  public static List<VirtualColumn> getAcidVirtualColumns(Table table) {
+    if (isTransactionalTable(table)) {
+      return Lists.newArrayList(VirtualColumn.ROWID);
+    } else {
+      if (isNonNativeAcidTable(table)) {
+        return table.getStorageHandler().acidVirtualColumns();
+      }
+    }
+    return Collections.emptyList();
+  }
+
   public static boolean acidTableWithoutTransactions(Table table) {
     return table != null && table.getStorageHandler() != null &&
         table.getStorageHandler().supportsAcidOperations() == 
HiveStorageHandler.AcidSupportType.WITHOUT_TRANSACTIONS;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 0784fe65ad..8d75db400b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Context.Operation;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
@@ -307,8 +308,11 @@ public interface HiveStorageHandler extends Configurable {
 
   /**
    * {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer} 
rewrites DELETE/UPDATE queries into INSERT
-   * queries. E.g. DELETE FROM T WHERE A = 32 is rewritten into
+   * queries.
+   * - DELETE FROM T WHERE A = 32 is rewritten into
    * INSERT INTO T SELECT &lt;selectCols&gt; FROM T WHERE A = 32 SORT BY 
&lt;sortCols&gt;.
+   * - UPDATE T SET B=12 WHERE A = 32 is rewritten into
+   * INSERT INTO T SELECT &lt;selectCols&gt;, &lt;newValues&gt; FROM T WHERE A 
= 32 SORT BY &lt;sortCols&gt;.
    *
    * This method specifies which columns should be injected into the 
&lt;selectCols&gt; part of the rewritten query.
    *
@@ -316,9 +320,10 @@ public interface HiveStorageHandler extends Configurable {
    * other NONE.
    *
    * @param table the table which is being deleted/updated/merged into
+   * @param operation the operation type we are executing
    * @return the list of columns that should be projected in the rewritten 
ACID query
    */
-  default List<FieldSchema> 
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
+  default List<FieldSchema> 
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation 
operation) {
     return Collections.emptyList();
   }
 
@@ -333,9 +338,10 @@ public interface HiveStorageHandler extends Configurable {
    * other NONE.
    *
    * @param table the table which is being deleted/updated/merged into
+   * @param operation the operation type we are executing
    * @return the list of columns that should be used as sort columns in the 
rewritten ACID query
    */
-  default List<FieldSchema> 
acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
+  default List<FieldSchema> 
acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation 
operation) {
     return Collections.emptyList();
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 22d6d63e04..9003984567 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -53,6 +53,7 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
@@ -6912,7 +6913,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, 
table_desc, input, false);
       }
     } else {
-      if (updating(dest) || deleting(dest)) {
+      // Non-native acid tables should handle their own bucketing for 
updates/deletes
+      if ((updating(dest) || deleting(dest)) && 
!AcidUtils.isNonNativeAcidTable(dest_tab)) {
         partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
         enforceBucketing = true;
       }
@@ -7320,7 +7322,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       input = genConstraintsPlan(dest, qb, input);
 
       if (!qb.getIsQuery()) {
-        input = genConversionSelectOperator(dest, qb, input, 
destinationTable.getDeserializer(), dpCtx, parts);
+        input = genConversionSelectOperator(dest, qb, input, 
destinationTable.getDeserializer(), dpCtx, parts, destinationTable);
       }
 
       if (destinationTable.isMaterializedView() &&
@@ -7467,7 +7469,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       input = genConstraintsPlan(dest, qb, input);
 
       if (!qb.getIsQuery()) {
-        input = genConversionSelectOperator(dest, qb, input, 
destinationTable.getDeserializer(), dpCtx, null);
+        input = genConversionSelectOperator(dest, qb, input, 
destinationTable.getDeserializer(), dpCtx, null, destinationTable);
       }
 
       if (destinationTable.isMaterializedView() &&
@@ -8437,7 +8439,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
    * types that are expected by the table_desc.
    */
   private Operator genConversionSelectOperator(String dest, QB qb, Operator 
input,
-      Deserializer deserializer, DynamicPartitionCtx dpCtx, List<FieldSchema> 
parts)
+      Deserializer deserializer, DynamicPartitionCtx dpCtx, List<FieldSchema> 
parts, Table table)
       throws SemanticException {
     StructObjectInspector oi = null;
     try {
@@ -8466,7 +8468,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
 
     // Check column types
-    boolean converted = false;
+    AtomicBoolean converted = new AtomicBoolean(false);
     int columnNumber = tableFields.size();
     List<ExprNodeDesc> expressions = new ArrayList<ExprNodeDesc>(columnNumber);
 
@@ -8474,52 +8476,33 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     // does the conversion to String by itself.
     if (!(deserializer instanceof MetadataTypedColumnsetSerDe) && 
!deleting(dest)) {
 
-      // If we're updating, add the ROW__ID expression, then make the 
following column accesses
-      // offset by 1 so that we don't try to convert the ROW__ID
-      if (updating(dest)) {
-        expressions.add(new ExprNodeColumnDesc(rowFields.get(0).getType(),
-            rowFields.get(0).getInternalName(), "", true));
+      // If we're updating, add the required virtual columns.
+      int virtualColumnSize = updating(dest) ? 
AcidUtils.getAcidVirtualColumns(table).size() : 0;
+      for (int i = 0; i < virtualColumnSize; i++) {
+        expressions.add(new ExprNodeColumnDesc(rowFields.get(i).getType(),
+            rowFields.get(i).getInternalName(), "", true));
       }
 
       // here only deals with non-partition columns. We deal with partition 
columns next
+      int rowFieldsOffset = expressions.size();
       for (int i = 0; i < columnNumber; i++) {
-        int rowFieldsOffset = updating(dest) ? i + 1 : i;
-        ObjectInspector tableFieldOI = tableFields.get(i)
-            .getFieldObjectInspector();
-        TypeInfo tableFieldTypeInfo = TypeInfoUtils
-            .getTypeInfoFromObjectInspector(tableFieldOI);
-        TypeInfo rowFieldTypeInfo = rowFields.get(rowFieldsOffset).getType();
-        ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo,
-            rowFields.get(rowFieldsOffset).getInternalName(), "", false,
-            rowFields.get(rowFieldsOffset).isSkewedCol());
-        // LazySimpleSerDe can convert any types to String type using
-        // JSON-format. However, we may add more operators.
-        // Thus, we still keep the conversion.
-        if (!tableFieldTypeInfo.equals(rowFieldTypeInfo)) {
-          // need to do some conversions here
-          converted = true;
-          if (tableFieldTypeInfo.getCategory() != Category.PRIMITIVE) {
-            // cannot convert to complex types
-            column = null;
-          } else {
-            column = ExprNodeTypeCheck.getExprNodeDefaultExprProcessor()
-                .createConversionCast(column, 
(PrimitiveTypeInfo)tableFieldTypeInfo);
-          }
-          if (column == null) {
-            String reason = "Cannot convert column " + i + " from "
-                + rowFieldTypeInfo + " to " + tableFieldTypeInfo + ".";
-            throw new SemanticException(ASTErrorUtils.getMsg(
-                ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(),
-                qb.getParseInfo().getDestForClause(dest), reason));
-          }
-        }
+        ExprNodeDesc column = handleConversion(tableFields.get(i), 
rowFields.get(rowFieldsOffset + i), converted, dest, i);
         expressions.add(column);
       }
 
+      // For Non-Native ACID tables we should convert the new values as well
+      rowFieldsOffset = expressions.size();
+      if (updating(dest) && AcidUtils.isNonNativeAcidTable(table)) {
+        for (int i = 0; i < columnNumber; i++) {
+          ExprNodeDesc column = handleConversion(tableFields.get(i), 
rowFields.get(rowFieldsOffset + i), converted, dest, i);
+          expressions.add(column);
+        }
+      }
+
       // deal with dynamic partition columns
+      rowFieldsOffset = expressions.size();
       if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
         // rowFields contains non-partitioned columns (tableFields) followed 
by DP columns
-        int rowFieldsOffset = tableFields.size() + (updating(dest) ? 1 : 0);
         for (int dpColIdx = 0; dpColIdx < rowFields.size() - rowFieldsOffset; 
++dpColIdx) {
 
           // create ExprNodeDesc
@@ -8549,7 +8532,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
               if (!partitionTypeInfo.equals(inputTypeInfo)) {
                 column = ExprNodeTypeCheck.getExprNodeDefaultExprProcessor()
                     .createConversionCast(column, partitionTypeInfo);
-                converted = true;
+                converted.set(true);
               }
             } else {
               LOG.warn("Partition schema for dynamic partition " + 
inputColumn.getAlias() + " ("
@@ -8562,7 +8545,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       }
     }
 
-    if (converted) {
+    if (converted.get()) {
       // add the select operator
       RowResolver rowResolver = new RowResolver();
       List<String> colNames = new ArrayList<String>();
@@ -8582,6 +8565,53 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return input;
   }
 
+  /**
+   * Creates an expression for converting from a table column to a row column. 
For example:
+   * The table column is int but the query provides a string in the row, then 
we need to cast automatically.
+   * @param tableField The target table column
+   * @param rowField The source row column
+   * @param conversion The value of this boolean is set to true if we detect 
that a conversion is needed. This is a
+   *                   hidden return value hidden here, to notify the caller 
that a cast was needed.
+   * @param dest The destination table for the error message
+   * @param columnNum The destination column id for the error message
+   * @return The Expression describing the selected column. Note that 
`conversion` can be considered as a return value
+   *         as well
+   * @throws SemanticException If conversion were needed, but automatic 
conversion is not available
+   */
+  private ExprNodeDesc handleConversion(StructField tableField, ColumnInfo 
rowField, AtomicBoolean conversion, String dest, int columnNum)
+      throws SemanticException {
+    ObjectInspector tableFieldOI = tableField
+        .getFieldObjectInspector();
+    TypeInfo tableFieldTypeInfo = TypeInfoUtils
+        .getTypeInfoFromObjectInspector(tableFieldOI);
+    TypeInfo rowFieldTypeInfo = rowField.getType();
+    ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo,
+        rowField.getInternalName(), "", false,
+        rowField.isSkewedCol());
+    // LazySimpleSerDe can convert any types to String type using
+    // JSON-format. However, we may add more operators.
+    // Thus, we still keep the conversion.
+    if (!tableFieldTypeInfo.equals(rowFieldTypeInfo)) {
+      // need to do some conversions here
+      conversion.set(true);
+      if (tableFieldTypeInfo.getCategory() != Category.PRIMITIVE) {
+        // cannot convert to complex types
+        column = null;
+      } else {
+        column = ExprNodeTypeCheck.getExprNodeDefaultExprProcessor()
+            .createConversionCast(column, 
(PrimitiveTypeInfo)tableFieldTypeInfo);
+      }
+      if (column == null) {
+        String reason = "Cannot convert column " + columnNum + " from "
+            + rowFieldTypeInfo + " to " + tableFieldTypeInfo + ".";
+        throw new SemanticException(ASTErrorUtils.getMsg(
+            ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(),
+            qb.getParseInfo().getDestForClause(dest), reason));
+      }
+    }
+    return column;
+  }
+
   @SuppressWarnings("nls")
   private Operator genLimitPlan(String dest, Operator input, int offset, int 
limit) {
     // A map-only job can be optimized - instead of converting it to a
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index deff4f3c20..70121096a8 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 
 /**
@@ -112,12 +111,17 @@ public class UpdateDeleteSemanticAnalyzer extends 
RewriteSemanticAnalyzer {
     addPartitionColsToInsert(mTable.getPartCols(), rewrittenQueryStr);
 
     boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(mTable);
+    int columnOffset;
     if (nonNativeAcid) {
-      String selectCols = 
mTable.getStorageHandler().acidSelectColumns(mTable).stream()
-          .map(FieldSchema::getName).collect(Collectors.joining(","));
+      List<FieldSchema> acidColumns = 
mTable.getStorageHandler().acidSelectColumns(mTable, operation);
+      String selectCols = acidColumns.stream()
+          .map(fieldSchema -> 
HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf))
+          .collect(Collectors.joining(","));
       rewrittenQueryStr.append(" select ").append(selectCols);
+      columnOffset = acidColumns.size();
     } else {
       rewrittenQueryStr.append(" select ROW__ID");
+      columnOffset = 1;
     }
 
     Map<Integer, ASTNode> setColExprs = null;
@@ -143,7 +147,7 @@ public class UpdateDeleteSemanticAnalyzer extends 
RewriteSemanticAnalyzer {
           // This is one of the columns we're setting, record it's position so 
we can come back
           // later and patch it up.
           // Add one to the index because the select has the ROW__ID as the 
first column.
-          setColExprs.put(i + 1, setCol);
+          setColExprs.put(columnOffset + i, setCol);
         }
       }
     }
@@ -162,9 +166,11 @@ public class UpdateDeleteSemanticAnalyzer extends 
RewriteSemanticAnalyzer {
 
     // Add a sort by clause so that the row ids come out in the correct order
     if (nonNativeAcid) {
-      String sortCols = 
mTable.getStorageHandler().acidSortColumns(mTable).stream()
-          .map(FieldSchema::getName).collect(Collectors.joining(","));
-      rewrittenQueryStr.append(" sort by ").append(sortCols).append(" ");
+      List<FieldSchema> sortColumns = 
mTable.getStorageHandler().acidSortColumns(mTable, operation);
+      if (!sortColumns.isEmpty()) {
+        String sortCols = 
sortColumns.stream().map(FieldSchema::getName).collect(Collectors.joining(","));
+        rewrittenQueryStr.append(" sort by ").append(sortCols).append(" ");
+      }
     } else {
       rewrittenQueryStr.append(" sort by ROW__ID ");
     }
@@ -191,16 +197,29 @@ public class UpdateDeleteSemanticAnalyzer extends 
RewriteSemanticAnalyzer {
       //          \-> TOK_INSERT -> TOK_INSERT_INTO
       //                        \-> TOK_SELECT
       //                        \-> TOK_SORTBY
+      // Or
+      // TOK_QUERY -> TOK_FROM
+      //          \-> TOK_INSERT -> TOK_INSERT_INTO
+      //                        \-> TOK_SELECT
+      //
       // The following adds the TOK_WHERE and its subtree from the original 
query as a child of
       // TOK_INSERT, which is where it would have landed if it had been there 
originally in the
       // string.  We do it this way because it's easy then turning the 
original AST back into a
-      // string and reparsing it.  We have to move the SORT_BY over one,
-      // so grab it and then push it to the second slot, and put the where in 
the first slot
-      ASTNode sortBy = (ASTNode)rewrittenInsert.getChildren().get(2);
-      assert sortBy.getToken().getType() == HiveParser.TOK_SORTBY :
-          "Expected TOK_SORTBY to be first child of TOK_SELECT, but found " + 
sortBy.getName();
-      rewrittenInsert.addChild(sortBy);
-      rewrittenInsert.setChild(2, where);
+      // string and reparsing it.
+      if (rewrittenInsert.getChildren().size() == 3) {
+        // We have to move the SORT_BY over one, so grab it and then push it 
to the second slot,
+        // and put the where in the first slot
+        ASTNode sortBy = (ASTNode) rewrittenInsert.getChildren().get(2);
+        assert sortBy.getToken().getType() == HiveParser.TOK_SORTBY :
+            "Expected TOK_SORTBY to be third child of TOK_INSERT, but found " 
+ sortBy.getName();
+        rewrittenInsert.addChild(sortBy);
+        rewrittenInsert.setChild(2, where);
+      } else {
+        ASTNode select = (ASTNode) rewrittenInsert.getChildren().get(1);
+        assert select.getToken().getType() == HiveParser.TOK_SELECT :
+            "Expected TOK_SELECT to be second child of TOK_INSERT, but found " 
+ select.getName();
+        rewrittenInsert.addChild(where);
+      }
     }
 
     // Patch up the projection list for updates, putting back the original set 
expressions.

Reply via email to