This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c7f6043322 HIVE-26136: Implement UPDATE statements for Iceberg tables
(Peter Vary reviewed by Laszlo Pinter) (#3204)
c7f6043322 is described below
commit c7f60433226b125af5ae281d0fb14a2e937d0787
Author: pvary <[email protected]>
AuthorDate: Mon May 9 07:36:12 2022 +0200
HIVE-26136: Implement UPDATE statements for Iceberg tables (Peter Vary
reviewed by Laszlo Pinter) (#3204)
---
.../org/apache/iceberg/mr/hive/FilesForCommit.java | 13 ++
.../mr/hive/HiveIcebergOutputCommitter.java | 128 ++++++++-------
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 11 +-
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 46 +++---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 51 ++++--
.../apache/iceberg/mr/hive/IcebergAcidUtil.java | 16 +-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 8 +-
.../java/org/apache/iceberg/mr/TestHelper.java | 23 +++
.../apache/iceberg/mr/hive/TestHiveIcebergV2.java | 180 +++++++++++++++++++++
.../org/apache/iceberg/mr/hive/TestHiveShell.java | 1 +
.../org/apache/iceberg/mr/hive/TestTables.java | 9 ++
.../positive/update_iceberg_partitioned_avro.q | 26 +++
.../positive/update_iceberg_partitioned_orc.q | 26 +++
.../positive/update_iceberg_partitioned_parquet.q | 26 +++
.../update_iceberg_unpartitioned_parquet.q | 26 +++
.../positive/delete_iceberg_partitioned_avro.q.out | 4 +-
.../positive/delete_iceberg_partitioned_orc.q.out | 4 +-
.../delete_iceberg_partitioned_parquet.q.out | 4 +-
.../delete_iceberg_unpartitioned_parquet.q.out | 4 +-
...q.out => update_iceberg_partitioned_avro.q.out} | 64 +++++---
....q.out => update_iceberg_partitioned_orc.q.out} | 64 +++++---
...ut => update_iceberg_partitioned_parquet.q.out} | 64 +++++---
... => update_iceberg_unpartitioned_parquet.q.out} | 64 +++++---
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 19 +++
.../hive/ql/metadata/HiveStorageHandler.java | 12 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 116 ++++++++-----
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 47 ++++--
27 files changed, 809 insertions(+), 247 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
index 237ef55369..953edfd0d2 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
@@ -27,6 +27,7 @@ import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
public class FilesForCommit implements Serializable {
@@ -61,4 +62,16 @@ public class FilesForCommit implements Serializable {
public Collection<? extends ContentFile> allFiles() {
return Stream.concat(dataFiles.stream(),
deleteFiles.stream()).collect(Collectors.toList());
}
+
+ public boolean isEmpty() {
+ return dataFiles.isEmpty() && deleteFiles.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("dataFiles", dataFiles.toString())
+ .add("deleteFiles", deleteFiles.toString())
+ .toString();
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 00edc3bd2e..0ea882a450 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -24,7 +24,6 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -33,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -54,7 +54,6 @@ import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Table;
-import org.apache.iceberg.Transaction;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.Util;
@@ -63,6 +62,7 @@ import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
@@ -252,11 +252,12 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
// list jobLocation to get number of forCommit files
// we do this because map/reduce num in jobConf is unreliable and
we have no access to vertex status info
int numTasks = listForCommits(jobConf, jobLocation).size();
- Collection<FilesForCommit> results = collectResults(numTasks,
fileExecutor, table.location(), jobContext,
+ FilesForCommit results = collectResults(numTasks, fileExecutor,
table.location(), jobContext,
table.io(), false);
- // Check if we have files already committed and remove data files
if there are any
- List<? extends ContentFile> files =
results.stream().map(FilesForCommit::allFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
+ // Check if we have files already written and remove data and
delta files if there are any
+ Collection<ContentFile> files =
Stream.concat(results.dataFiles().stream(), results.deleteFiles().stream())
+ .collect(Collectors.toList());
+
if (files.size() > 0) {
Tasks.foreach(files)
.retry(3)
@@ -330,61 +331,71 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() :
conf.getNumMapTasks();
});
- Collection<FilesForCommit> writeResults = collectResults(numTasks,
executor, location, jobContext, io, true);
- if (writeResults.isEmpty()) {
- LOG.info("Not creating a new commit for table: {}, jobID: {}, isDelete:
{}, since there were no new files to add",
- table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
- } else {
- if (HiveIcebergStorageHandler.isDelete(conf, name)) {
- commitDelete(table, Optional.empty(), startTime, writeResults);
+ FilesForCommit writeResults = collectResults(numTasks, executor, location,
jobContext, io, true);
+ if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
+ if (writeResults.isEmpty()) {
+ LOG.info(
+ "Not creating a new commit for table: {}, jobID: {}, isDelete: {},
since there were no new files to add",
+ table, jobContext.getJobID(),
HiveIcebergStorageHandler.isDelete(conf, name));
} else {
- boolean isOverwrite = conf.getBoolean(InputFormatConfig.IS_OVERWRITE,
false);
- commitInsert(table, Optional.empty(), startTime, writeResults,
isOverwrite);
+ commitWrite(table, startTime, writeResults);
}
+ } else {
+ commitOverwrite(table, startTime, writeResults);
}
}
- private void commitDelete(Table table, Optional<Transaction> txn, long
startTime,
- Collection<FilesForCommit> results) {
- RowDelta append =
txn.map(Transaction::newRowDelta).orElse(table.newRowDelta());
- List<DeleteFile> deleteFiles =
results.stream().map(FilesForCommit::deleteFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
- deleteFiles.forEach(append::addDeletes);
- append.commit();
- LOG.info("Delete commit took {} ms for table: {} with {} delete file(s)",
- System.currentTimeMillis() - startTime, table, deleteFiles.size());
- LOG.debug("Added delete files {}", deleteFiles);
+ /**
+ * Creates and commits an Iceberg change with the provided data and delete
files.
+ * If there are no delete files then an Iceberg 'append' is created,
otherwise Iceberg 'overwrite' is created.
+ * @param table The table we are changing
+ * @param startTime The start time of the commit - used only for logging
+ * @param results The object containing the new files we would like to add
to the table
+ */
+ private void commitWrite(Table table, long startTime, FilesForCommit
results) {
+ if (results.deleteFiles().isEmpty()) {
+ AppendFiles write = table.newAppend();
+ results.dataFiles().forEach(write::appendFile);
+ write.commit();
+ } else {
+ RowDelta write = table.newRowDelta();
+ results.dataFiles().forEach(write::addRows);
+ results.deleteFiles().forEach(write::addDeletes);
+ write.commit();
+ }
+
+ LOG.info("Write commit took {} ms for table: {} with {} data and {} delete
file(s)",
+ System.currentTimeMillis() - startTime, table,
results.dataFiles().size(), results.deleteFiles().size());
+ LOG.debug("Added files {}", results);
}
- private void commitInsert(Table table, Optional<Transaction> txn, long
startTime,
- Collection<FilesForCommit> results, boolean isOverwrite) {
- List<DataFile> dataFiles = results.stream().map(FilesForCommit::dataFiles)
- .flatMap(Collection::stream).collect(Collectors.toList());
- if (isOverwrite) {
- if (!dataFiles.isEmpty()) {
- ReplacePartitions overwrite =
txn.map(Transaction::newReplacePartitions).orElse(table.newReplacePartitions());
- dataFiles.forEach(overwrite::addFile);
- overwrite.commit();
- LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime,
- table, dataFiles.size());
- } else if (table.spec().isUnpartitioned()) {
- DeleteFiles deleteFiles =
txn.map(Transaction::newDelete).orElse(table.newDelete());
- deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
- deleteFiles.commit();
- LOG.info("Cleared table contents as part of empty overwrite for
unpartitioned table. " +
- "Commit took {} ms for table: {}", System.currentTimeMillis() -
startTime, table);
- }
- LOG.debug("Overwrote partitions with files {}", dataFiles);
- } else if (!dataFiles.isEmpty()) {
- // Appending data files to the table
- // We only create a new commit if there's something to append
- AppendFiles append =
txn.map(Transaction::newAppend).orElse(table.newAppend());
- dataFiles.forEach(append::appendFile);
- append.commit();
- LOG.info("Append commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime, table,
- dataFiles.size());
- LOG.debug("Added files {}", dataFiles);
+ /**
+ * Creates and commits an Iceberg insert overwrite change with the provided
data files.
+ * For unpartitioned tables the table content is replaced with the new data
files. If not data files are provided
+ * then the unpartitioned table is truncated.
+ * For partitioned tables the relevant partitions are replaced with the new
data files. If no data files are provided
+ * then the unpartitioned table remains unchanged.
+ * @param table The table we are changing
+ * @param startTime The start time of the commit - used only for logging
+ * @param results The object containing the new files
+ */
+ private void commitOverwrite(Table table, long startTime, FilesForCommit
results) {
+ Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not
handle deletes with overwrite");
+ if (!results.dataFiles().isEmpty()) {
+ ReplacePartitions overwrite = table.newReplacePartitions();
+ results.dataFiles().forEach(overwrite::addFile);
+ overwrite.commit();
+ LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime,
+ table, results.dataFiles().size());
+ } else if (table.spec().isUnpartitioned()) {
+ DeleteFiles deleteFiles = table.newDelete();
+ deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
+ deleteFiles.commit();
+ LOG.info("Cleared table contents as part of empty overwrite for
unpartitioned table. " +
+ "Commit took {} ms for table: {}", System.currentTimeMillis() -
startTime, table);
}
+
+ LOG.debug("Overwrote partitions with files {}", results);
}
/**
@@ -464,22 +475,25 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
* @param throwOnFailure If <code>true</code> then it throws an exception on
failure
* @return The list of the write results, which include the committed data
or delete files
*/
- private static Collection<FilesForCommit> collectResults(int numTasks,
ExecutorService executor, String location,
+ private static FilesForCommit collectResults(int numTasks, ExecutorService
executor, String location,
JobContext jobContext, FileIO io, boolean throwOnFailure) {
JobConf conf = jobContext.getJobConf();
// Reading the committed files. The assumption here is that the taskIds
are generated in sequential order
// starting from 0.
- Collection<FilesForCommit> writeResults = new ConcurrentLinkedQueue<>();
+ Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
+ Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
Tasks.range(numTasks)
.throwFailureWhenFinished(throwOnFailure)
.executeWith(executor)
.retry(3)
.run(taskId -> {
String taskFileName = generateFileForCommitLocation(location, conf,
jobContext.getJobID(), taskId);
- writeResults.add(readFileForCommit(taskFileName, io));
+ FilesForCommit files = readFileForCommit(taskFileName, io);
+ dataFiles.addAll(files.dataFiles());
+ deleteFiles.addAll(files.deleteFiles());
});
- return writeResults;
+ return new FilesForCommit(dataFiles, deleteFiles);
}
/**
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index ea3b29b439..552bffa236 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -81,12 +81,19 @@ public class HiveIcebergOutputFormat<T> implements
OutputFormat<NullWritable, Co
.operationId(operationId)
.build();
String tableName = jc.get(Catalogs.NAME);
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
- null, null, null, schema);
if (HiveIcebergStorageHandler.isDelete(jc, tableName)) {
+ HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
+ null, null, null, schema);
return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat,
writerFactory, outputFileFactory, io,
targetFileSize, taskAttemptID, tableName);
+ } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) {
+ HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
+ null, null, null, null);
+ return new HiveIcebergUpdateWriter(schema, table.specs(),
table.spec().specId(), fileFormat, writerFactory,
+ outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc);
} else {
+ HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table,
fileFormat, schema, null, fileFormat,
+ null, null, null, schema);
return new HiveIcebergRecordWriter(schema, table.specs(),
table.spec().specId(), fileFormat, writerFactory,
outputFileFactory, io, targetFileSize, taskAttemptID, tableName,
false);
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index eecf1518cd..a4470058c7 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -128,40 +128,48 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
}
- String tableName = serDeProperties.getProperty(Catalogs.NAME);
+ this.projectedSchema = projectedSchema(configuration,
serDeProperties.getProperty(Catalogs.NAME), tableSchema);
+
+ // Currently ClusteredWriter is used which requires that records are
ordered by partition keys.
+ // Here we ensure that SortedDynPartitionOptimizer will kick in and do the
sorting.
+ // TODO: remove once we have both Fanout and ClusteredWriter available:
HIVE-25948
+ HiveConf.setIntVar(configuration,
HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD, 1);
+ HiveConf.setVar(configuration, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE,
"nonstrict");
+
+ try {
+ this.inspector = IcebergObjectInspector.create(projectedSchema);
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ private static Schema projectedSchema(Configuration configuration, String
tableName, Schema tableSchema) {
if (HiveIcebergStorageHandler.isDelete(configuration, tableName)) {
// when writing delete files, we should use the full delete schema
- projectedSchema =
IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
+ return IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
+ } else if (HiveIcebergStorageHandler.isUpdate(configuration, tableName)) {
+ // when writing delete files, we should use the full delete schema
+ return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns());
} else if (HiveIcebergStorageHandler.isWrite(configuration, tableName)) {
- // when writing out data, we should not do projection pushdown
- projectedSchema = tableSchema;
+ // when writing out data, we should not do projection push down
+ return tableSchema;
} else {
configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
String[] selectedColumns =
ColumnProjectionUtils.getReadColumnNames(configuration);
// When same table is joined multiple times, it is possible some
selected columns are duplicated,
// in this case wrong recordStructField position leads wrong value or
ArrayIndexOutOfBoundException
String[] distinctSelectedColumns =
Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
- projectedSchema = distinctSelectedColumns.length > 0 ?
- tableSchema.caseInsensitiveSelect(distinctSelectedColumns) :
tableSchema;
+ Schema projectedSchema = distinctSelectedColumns.length > 0 ?
+ tableSchema.caseInsensitiveSelect(distinctSelectedColumns) :
tableSchema;
// the input split mapper handles does not belong to this table
// it is necessary to ensure projectedSchema equals to tableSchema,
// or we cannot find selectOperator's column from inspector
if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
- projectedSchema = tableSchema;
+ return tableSchema;
+ } else {
+ return projectedSchema;
}
}
-
- // Currently ClusteredWriter is used which requires that records are
ordered by partition keys.
- // Here we ensure that SortedDynPartitionOptimizer will kick in and do the
sorting.
- // TODO: remove once we have both Fanout and ClusteredWriter available:
HIVE-25948
- HiveConf.setIntVar(configuration,
HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD, 1);
- HiveConf.setVar(configuration, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE,
"nonstrict");
-
- try {
- this.inspector = IcebergObjectInspector.create(projectedSchema);
- } catch (Exception e) {
- throw new SerDeException(e);
- }
}
private void createTableForCTAS(Configuration configuration, Properties
serDeProperties) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 6fdddb9b34..39042691e8 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -355,7 +355,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
throws SemanticException {
// delete records are already clustered by partition spec id and the hash
of the partition struct
// there is no need to do any additional sorting based on partition columns
- if (getOperationType().equals(Context.Operation.DELETE.name())) {
+ if (getOperationType().equals(Operation.DELETE.name())) {
return null;
}
@@ -380,12 +380,13 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
fieldOrderMap.put(fields.get(i).name(), i);
}
+ int offset = acidSelectColumns(hmsTable,
Operation.valueOf(getOperationType())).size();
for (PartitionTransformSpec spec : partitionTransformSpecs) {
int order = fieldOrderMap.get(spec.getColumnName());
if
(PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
- customSortExprs.add(BUCKET_SORT_EXPR.apply(order,
spec.getTransformParam().get()));
+ customSortExprs.add(BUCKET_SORT_EXPR.apply(order + offset,
spec.getTransformParam().get()));
} else {
- customSortExprs.add(cols -> cols.get(order).clone());
+ customSortExprs.add(cols -> cols.get(order + offset).clone());
}
}
@@ -490,17 +491,30 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
@Override
- public List<FieldSchema>
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
- // TODO: make it configurable whether we want to include the table columns
in the select query
- // it might make delete writes faster if we don't have to write out the
row object
- return Stream.of(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols())
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ public List<FieldSchema>
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation
operation) {
+ switch (operation) {
+ case DELETE:
+ case UPDATE:
+ // TODO: make it configurable whether we want to include the table
columns in the select query.
+ // It might make delete writes faster if we don't have to write out
the row object
+ return Stream.of(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols())
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ default:
+ return ImmutableList.of();
+ }
}
@Override
- public List<FieldSchema>
acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
- return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
+ public List<FieldSchema>
acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation
operation) {
+ switch (operation) {
+ case DELETE:
+ return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
+ default:
+ // For update operations we use the same sort order defined by
+ // {@link #createDPContext(HiveConf,
org.apache.hadoop.hive.ql.metadata.Table)}
+ return ImmutableList.of();
+ }
}
private void setCommonJobConf(JobConf jobConf) {
@@ -543,12 +557,17 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
public static boolean isWrite(Configuration conf, String tableName) {
- return conf != null && tableName != null &&
Context.Operation.OTHER.name().equals(
+ return conf != null && tableName != null && Operation.OTHER.name().equals(
conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
}
public static boolean isDelete(Configuration conf, String tableName) {
- return conf != null && tableName != null &&
Context.Operation.DELETE.name().equals(
+ return conf != null && tableName != null && Operation.DELETE.name().equals(
+ conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
+ }
+
+ public static boolean isUpdate(Configuration conf, String tableName) {
+ return conf != null && tableName != null && Operation.UPDATE.name().equals(
conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
}
@@ -788,8 +807,8 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
private String getOperationType() {
- return SessionStateUtil.getProperty(conf,
Context.Operation.class.getSimpleName())
- .orElse(Context.Operation.OTHER.name());
+ return SessionStateUtil.getProperty(conf, Operation.class.getSimpleName())
+ .orElse(Operation.OTHER.name());
}
private static class NonSerializingConfig implements Serializable {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
index faa2ca3ec1..73fe9743e7 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
@@ -115,20 +115,20 @@ public class IcebergAcidUtil {
* @return The schema for reading files, extended with metadata columns
needed for deletes
*/
public static Schema createFileReadSchemaForUpdate(List<Types.NestedField>
dataCols, Table table) {
- List<Types.NestedField> cols =
Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
- SERDE_META_COLS.forEach((metaCol, index) -> {
+ List<Types.NestedField> cols =
Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());
+ FILE_READ_META_COLS.forEach((metaCol, index) -> {
if (metaCol == PARTITION_STRUCT_META_COL) {
cols.add(MetadataColumns.metadataColumn(table,
MetadataColumns.PARTITION_COLUMN_NAME));
} else {
cols.add(metaCol);
}
});
- // New column values
- cols.addAll(dataCols);
// Old column values
cols.addAll(dataCols.stream()
.map(f -> Types.NestedField.optional(1147483545 + f.fieldId(),
"__old_value_for" + f.name(), f.type()))
.collect(Collectors.toList()));
+ // New column values
+ cols.addAll(dataCols);
return new Schema(cols);
}
@@ -139,12 +139,12 @@ public class IcebergAcidUtil {
public static Schema createSerdeSchemaForUpdate(List<Types.NestedField>
dataCols) {
List<Types.NestedField> cols =
Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
SERDE_META_COLS.forEach((metaCol, index) -> cols.add(metaCol));
- // New column values
- cols.addAll(dataCols);
// Old column values
cols.addAll(dataCols.stream()
.map(f -> Types.NestedField.optional(1147483545 + f.fieldId(),
"__old_value_for_" + f.name(), f.type()))
.collect(Collectors.toList()));
+ // New column values
+ cols.addAll(dataCols);
return new Schema(cols);
}
@@ -154,7 +154,7 @@ public class IcebergAcidUtil {
* @param original The record object to populate. The end result is the
original record before the update.
*/
public static void populateWithOriginalValues(Record rec, Record original) {
- int dataOffset = SERDE_META_COLS.size() + original.size();
+ int dataOffset = SERDE_META_COLS.size();
for (int i = dataOffset; i < dataOffset + original.size(); ++i) {
original.set(i - dataOffset, rec.get(i));
}
@@ -166,7 +166,7 @@ public class IcebergAcidUtil {
* @param newRecord The record object to populate. The end result is the new
record after the update.
*/
public static void populateWithNewValues(Record rec, Record newRecord) {
- int dataOffset = SERDE_META_COLS.size();
+ int dataOffset = SERDE_META_COLS.size() + newRecord.size();
for (int i = dataOffset; i < dataOffset + newRecord.size(); ++i) {
newRecord.set(i - dataOffset, rec.get(i));
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 085491ffa8..82636ff95a 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -264,7 +264,8 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
while (true) {
if (currentIterator.hasNext()) {
current = currentIterator.next();
- if (HiveIcebergStorageHandler.isDelete(conf,
conf.get(Catalogs.NAME))) {
+ if (HiveIcebergStorageHandler.isDelete(conf,
conf.get(Catalogs.NAME)) ||
+ HiveIcebergStorageHandler.isUpdate(conf,
conf.get(Catalogs.NAME))) {
GenericRecord rec = (GenericRecord) current;
PositionDeleteInfo.setIntoConf(conf,
IcebergAcidUtil.parseSpecId(rec),
@@ -512,6 +513,11 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
readSchema =
IcebergAcidUtil.createFileReadSchemaForDelete(readSchema.columns(), table);
}
+ // for UPDATE queries, add additional metadata columns into the read
schema
+ if (HiveIcebergStorageHandler.isUpdate(conf, conf.get(Catalogs.NAME))) {
+ readSchema =
IcebergAcidUtil.createFileReadSchemaForUpdate(readSchema.columns(), table);
+ }
+
return readSchema;
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
index 25c04aebfd..df61cfd8c3 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -151,6 +151,29 @@ public class TestHelper {
return appender().writeFile(partition, records);
}
+ public Map<DataFile, List<Record>> writeFiles(List<Record> rowSet) throws
IOException {
+ // The rows collected by partitions
+ Map<PartitionKey, List<Record>> rows = Maps.newHashMap();
+ PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+ for (Record record : rowSet) {
+ partitionKey.partition(record);
+ List<Record> partitionRows = rows.get(partitionKey);
+ if (partitionRows == null) {
+ partitionRows = rows.put(partitionKey.copy(), Lists.newArrayList());
+ }
+
+ partitionRows.add(record);
+ }
+
+ // Write out the partitions one-by-one
+ Map<DataFile, List<Record>> dataFiles =
Maps.newHashMapWithExpectedSize(rows.size());
+ for (PartitionKey partition : rows.keySet()) {
+ dataFiles.put(writeFile(partition, rows.get(partition)),
rows.get(partition));
+ }
+
+ return dataFiles;
+ }
+
private GenericAppenderHelper appender() {
return new GenericAppenderHelper(table, fileFormat, tmp, conf);
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
index 1c9d3e1922..016fea5a09 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
@@ -383,6 +383,186 @@ public class TestHiveIcebergV2 extends
HiveIcebergStorageHandlerWithEngineBase {
}
}
+ @Test
+ public void testUpdateStatementUnpartitioned() {
+ // create and insert an initial batch of records
+ testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ PartitionSpec.unpartitioned(), fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+ // insert one more batch so that we have multiple data files within the
same partition
+
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
+ TableIdentifier.of("default", "customers"), false));
+
+ shell.executeStatement("UPDATE customers SET last_name='Changed' WHERE
customer_id=3 or first_name='Joanna'");
+
+ List<Object[]> objects =
+ shell.executeStatement("SELECT * FROM customers ORDER BY customer_id,
last_name, first_name");
+ Assert.assertEquals(12, objects.size());
+ List<Record> expected =
TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(1L, "Joanna", "Changed")
+ .add(1L, "Sharon", "Taylor")
+ .add(2L, "Joanna", "Changed")
+ .add(2L, "Jake", "Donnel")
+ .add(2L, "Susan", "Morrison")
+ .add(2L, "Bob", "Silver")
+ .add(3L, "Blake", "Changed")
+ .add(3L, "Marci", "Changed")
+ .add(3L, "Trudy", "Changed")
+ .add(3L, "Trudy", "Changed")
+ .add(4L, "Laci", "Zold")
+ .add(5L, "Peti", "Rozsaszin")
+ .build();
+ HiveIcebergTestUtils.validateData(expected,
+
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
objects), 0);
+ }
+
+ @Test
+ public void testUpdateStatementPartitioned() {
+ PartitionSpec spec =
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .identity("last_name").bucket("customer_id", 16).build();
+
+ // create and insert an initial batch of records
+ testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+ // insert one more batch so that we have multiple data files within the
same partition
+
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
+ TableIdentifier.of("default", "customers"), false));
+
+ shell.executeStatement("UPDATE customers SET last_name='Changed' WHERE
customer_id=3 or first_name='Joanna'");
+
+ List<Object[]> objects =
+ shell.executeStatement("SELECT * FROM customers ORDER BY customer_id,
last_name, first_name");
+ Assert.assertEquals(12, objects.size());
+ List<Record> expected =
TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(1L, "Joanna", "Changed")
+ .add(1L, "Sharon", "Taylor")
+ .add(2L, "Joanna", "Changed")
+ .add(2L, "Jake", "Donnel")
+ .add(2L, "Susan", "Morrison")
+ .add(2L, "Bob", "Silver")
+ .add(3L, "Blake", "Changed")
+ .add(3L, "Marci", "Changed")
+ .add(3L, "Trudy", "Changed")
+ .add(3L, "Trudy", "Changed")
+ .add(4L, "Laci", "Zold")
+ .add(5L, "Peti", "Rozsaszin")
+ .build();
+ HiveIcebergTestUtils.validateData(expected,
+
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
objects), 0);
+ }
+
+ @Test
+ public void testUpdateStatementWithOtherTable() {
+ PartitionSpec spec =
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .identity("last_name").bucket("customer_id", 16).build();
+
+ // create a couple of tables, with an initial batch of records
+ testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+ testTables.createTable(shell, "other",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1, 2);
+
+ shell.executeStatement("UPDATE customers SET last_name='Changed' WHERE
customer_id in " +
+ "(select t1.customer_id from customers t1 join other t2 on
t1.customer_id = t2.customer_id) or " +
+ "first_name in (select first_name from customers where first_name =
'Bob')");
+
+ List<Object[]> objects =
+ shell.executeStatement("SELECT * FROM customers ORDER BY customer_id,
last_name, last_name");
+ Assert.assertEquals(9, objects.size());
+ List<Record> expected =
TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .add(1L, "Joanna", "Pierce")
+ .add(1L, "Sharon", "Taylor")
+ .add(2L, "Bob", "Changed")
+ .add(2L, "Jake", "Donnel")
+ .add(2L, "Susan", "Morrison")
+ .add(2L, "Joanna", "Silver")
+ .add(3L, "Blake", "Changed")
+ .add(3L, "Trudy", "Changed")
+ .add(3L, "Trudy", "Changed")
+ .build();
+ HiveIcebergTestUtils.validateData(expected,
+
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
objects), 0);
+ }
+
+ @Test
+ public void testUpdateStatementWithPartitionAndSchemaEvolution() {
+ PartitionSpec spec =
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .identity("last_name").bucket("customer_id", 16).build();
+
+ // create and insert an initial batch of records
+ testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+ // insert one more batch so that we have multiple data files within the
same partition
+
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
+ TableIdentifier.of("default", "customers"), false));
+
+ // change the partition spec + schema, and insert some new records
+ shell.executeStatement("ALTER TABLE customers SET PARTITION SPEC
(bucket(64, last_name))");
+ shell.executeStatement("ALTER TABLE customers ADD COLUMNS (department
string)");
+ shell.executeStatement("ALTER TABLE customers CHANGE COLUMN first_name
given_name string first");
+ shell.executeStatement("INSERT INTO customers VALUES ('Natalie', 20,
'Bloom', 'Finance'), ('Joanna', 22, " +
+ "'Huberman', 'Operations')");
+
+ // update should handle changing records both from older specs and from
the new spec without problems
+ // there are records with Joanna in both the old and new spec, as well as
the old and new schema
+ shell.executeStatement("UPDATE customers set last_name='Changed' WHERE
customer_id=3 or given_name='Joanna'");
+
+ List<Object[]> objects =
+ shell.executeStatement("SELECT * FROM customers ORDER BY customer_id,
last_name, given_name");
+ Assert.assertEquals(14, objects.size());
+
+ Schema newSchema = new Schema(
+ optional(2, "given_name", Types.StringType.get()),
+ optional(1, "customer_id", Types.LongType.get()),
+ optional(3, "last_name", Types.StringType.get(), "This is last name"),
+ optional(4, "department", Types.StringType.get())
+ );
+ List<Record> expected = TestHelper.RecordsBuilder.newInstance(newSchema)
+ .add("Joanna", 1L, "Changed", null)
+ .add("Sharon", 1L, "Taylor", null)
+ .add("Joanna", 2L, "Changed", null)
+ .add("Jake", 2L, "Donnel", null)
+ .add("Susan", 2L, "Morrison", null)
+ .add("Bob", 2L, "Silver", null)
+ .add("Blake", 3L, "Changed", null)
+ .add("Marci", 3L, "Changed", null)
+ .add("Trudy", 3L, "Changed", null)
+ .add("Trudy", 3L, "Changed", null)
+ .add("Laci", 4L, "Zold", null)
+ .add("Peti", 5L, "Rozsaszin", null)
+ .add("Natalie", 20L, "Bloom", "Finance")
+ .add("Joanna", 22L, "Changed", "Operations")
+ .build();
+ HiveIcebergTestUtils.validateData(expected,
HiveIcebergTestUtils.valueForRow(newSchema, objects), 0);
+ }
+
+ @Test
+ public void testUpdateForSupportedTypes() throws IOException {
+ for (int i = 0; i < SUPPORTED_TYPES.size(); i++) {
+ Type type = SUPPORTED_TYPES.get(i);
+
+ // TODO: remove this filter when issue #1881 is resolved
+ if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
+ continue;
+ }
+
+ // TODO: remove this filter when we figure out how we could test binary
types
+ if (type == Types.BinaryType.get() ||
type.equals(Types.FixedType.ofLength(5))) {
+ continue;
+ }
+
+ String tableName = type.typeId().toString().toLowerCase() + "_table_" +
i;
+ String columnName = type.typeId().toString().toLowerCase() + "_column";
+
+ Schema schema = new Schema(required(1, columnName, type));
+ List<Record> originalRecords = TestHelper.generateRandomRecords(schema,
1, 0L);
+ Table table = testTables.createTable(shell, tableName, schema,
fileFormat, originalRecords, 2);
+
+ List<Record> newRecords = TestHelper.generateRandomRecords(schema, 1,
3L);
+ shell.executeStatement(testTables.getUpdateQuery(tableName,
newRecords.get(0)));
+ HiveIcebergTestUtils.validateData(table, newRecords, 0);
+ }
+ }
+
private static <T> PositionDelete<T> positionDelete(CharSequence path, long
pos, T row) {
PositionDelete<T> positionDelete = PositionDelete.create();
return positionDelete.set(path, pos, row);
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
index 7f43dc28e7..d05d0bf250 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
@@ -205,6 +205,7 @@ public class TestHiveShell {
// Disable vectorization for HiveIcebergInputFormat
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
hiveConf.set(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
DbTxnManager.class.getName());
hiveConf.set(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER.varname,
SQLStdHiveAuthorizerFactory.class.getName());
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 4bf7f88652..91c70dde62 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -316,6 +316,15 @@ abstract class TestTables {
return query.toString();
}
+ public String getUpdateQuery(String tableName, Record record) {
+ StringBuilder query = new StringBuilder("UPDATE
").append(tableName).append(" SET ");
+
+ query.append(record.struct().fields().stream()
+ .map(field -> field.name() + "=" +
getStringValueForInsert(record.getField(field.name()), field.type()))
+ .collect(Collectors.joining(",")));
+ return query.toString();
+ }
+
/**
* Creates a Hive test table. Creates the Iceberg table/data and creates the
corresponding Hive table as well when
* needed. The table will be in the 'default' database. The table will be
populated with the provided with randomly
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_avro.q
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_avro.q
new file mode 100644
index 0000000000..648b422527
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_avro.q
@@ -0,0 +1,26 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string, c int) partitioned by spec
(bucket(16, a), truncate(3, b)) stored by iceberg stored as avro tblproperties
('format-version'='2');
+
+-- update using simple predicates
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52),
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
+update tbl_ice set b='Changes' where b in ('one', 'four') or a = 22;
+select * from tbl_ice order by a, b, c;
+
+-- update using subqueries referencing the same table
+insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
+update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & another iceberg table
+drop table if exists tbl_ice_other;
+create external table tbl_ice_other(a int, b string) stored by iceberg;
+insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
+update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & a non-iceberg table
+drop table if exists tbl_standard_other;
+create external table tbl_standard_other(a int, b string) stored as orc;
+insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
+update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_orc.q
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_orc.q
new file mode 100644
index 0000000000..a8ddb7fc3e
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_orc.q
@@ -0,0 +1,26 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string, c int) partitioned by spec
(bucket(16, a), truncate(3, b)) stored by iceberg stored as orc tblproperties
('format-version'='2');
+
+-- update using simple predicates
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52),
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
+update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22;
+select * from tbl_ice order by a, b, c;
+
+-- update using subqueries referencing the same table
+insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
+update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & another table
+drop table if exists tbl_ice_other;
+create external table tbl_ice_other(a int, b string) stored by iceberg;
+insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
+update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & a non-iceberg table
+drop table if exists tbl_standard_other;
+create external table tbl_standard_other(a int, b string) stored as orc;
+insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
+update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_parquet.q
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_parquet.q
new file mode 100644
index 0000000000..c207f956c5
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_partitioned_parquet.q
@@ -0,0 +1,26 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string, c int) partitioned by spec
(bucket(16, a), truncate(3, b)) stored by iceberg tblproperties
('format-version'='2');
+
+-- update using simple predicates
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52),
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
+update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22;
+select * from tbl_ice order by a, b, c;
+
+-- update using subqueries referencing the same table
+insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
+update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & another table
+drop table if exists tbl_ice_other;
+create external table tbl_ice_other(a int, b string) stored by iceberg;
+insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
+update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & a non-iceberg table
+drop table if exists tbl_standard_other;
+create external table tbl_standard_other(a int, b string) stored as orc;
+insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
+update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_unpartitioned_parquet.q
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_unpartitioned_parquet.q
new file mode 100644
index 0000000000..ecc7ce02e6
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_unpartitioned_parquet.q
@@ -0,0 +1,26 @@
+drop table if exists tbl_ice;
+create external table tbl_ice(a int, b string, c int) stored by iceberg
tblproperties ('format-version'='2');
+
+-- update using simple predicates
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52),
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
+update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22;
+select * from tbl_ice order by a, b, c;
+
+-- update using subqueries referencing the same table
+insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
+update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & another table
+drop table if exists tbl_ice_other;
+create external table tbl_ice_other(a int, b string) stored by iceberg;
+insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
+update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
+
+-- update using a join subquery between the same table & a non-iceberg table
+drop table if exists tbl_standard_other;
+create external table tbl_standard_other(a int, b string) stored as orc;
+insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
+update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a);
+select * from tbl_ice order by a, b, c;
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
index 2a6ca684ed..fb85bad5f1 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
@@ -46,8 +46,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola',
800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
index b31e6322a2..f8641e4b09 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
@@ -46,8 +46,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola',
800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
index 5535baa16a..e5199c19c6 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
@@ -46,8 +46,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola',
800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
index e0355ce680..02d06c9097 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
@@ -46,8 +46,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola',
800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
similarity index 72%
copy from
iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
copy to
iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
index 2a6ca684ed..351011cb47 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_avro.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
@@ -18,25 +18,28 @@ POSTHOOK: query: insert into tbl_ice values (1, 'one', 50),
(2, 'two', 51), (3,
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+PREHOOK: query: update tbl_ice set b='Changes' where b in ('one', 'four') or a
= 22
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+POSTHOOK: query: update tbl_ice set b='Changes' where b in ('one', 'four') or
a = 22
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changes 50
2 two 51
3 three 52
+4 Changes 53
5 five 54
+111 Changes 55
333 two 56
PREHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
801)
PREHOOK: type: QUERY
@@ -46,26 +49,33 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola',
800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
-PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from
tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
+POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a
from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changes 55
333 two 56
444 hola 800
+555 Changed again 801
PREHOOK: query: drop table if exists tbl_ice_other
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_ice_other
@@ -86,25 +96,33 @@ POSTHOOK: query: insert into tbl_ice_other values (10,
'ten'), (333, 'hundred')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice_other
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a
from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_ice_other
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select
t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_ice_other
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changes 55
+333 Changed forever 56
444 hola 800
+555 Changed again 801
PREHOOK: query: drop table if exists tbl_standard_other
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_standard_other
@@ -127,22 +145,30 @@ POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_standard_other
POSTHOOK: Lineage: tbl_standard_other.a SCRIPT []
POSTHOOK: Lineage: tbl_standard_other.b SCRIPT []
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_standard_other
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_standard_other
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select count(*) from tbl_ice
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select count(*) from tbl_ice
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
-0
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changes 55
+333 Changed forever 56
+444 The last one 800
+555 Changed again 801
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
similarity index 72%
copy from
iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
copy to
iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
index b31e6322a2..4fbe5c0500 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_orc.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
@@ -18,25 +18,28 @@ POSTHOOK: query: insert into tbl_ice values (1, 'one', 50),
(2, 'two', 51), (3,
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a
= 22
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or
a = 22
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed 50
2 two 51
3 three 52
+4 Changed 53
5 five 54
+111 Changed 55
333 two 56
PREHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
801)
PREHOOK: type: QUERY
@@ -46,26 +49,33 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola',
800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
-PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from
tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
+POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a
from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
333 two 56
444 hola 800
+555 Changed again 801
PREHOOK: query: drop table if exists tbl_ice_other
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_ice_other
@@ -86,25 +96,33 @@ POSTHOOK: query: insert into tbl_ice_other values (10,
'ten'), (333, 'hundred')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice_other
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a
from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_ice_other
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select
t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_ice_other
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
+333 Changed forever 56
444 hola 800
+555 Changed again 801
PREHOOK: query: drop table if exists tbl_standard_other
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_standard_other
@@ -127,22 +145,30 @@ POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_standard_other
POSTHOOK: Lineage: tbl_standard_other.a SCRIPT []
POSTHOOK: Lineage: tbl_standard_other.b SCRIPT []
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_standard_other
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_standard_other
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select count(*) from tbl_ice
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select count(*) from tbl_ice
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
-0
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
+333 Changed forever 56
+444 The last one 800
+555 Changed again 801
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
similarity index 71%
copy from
iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
copy to
iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
index 5535baa16a..46913856ed 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_partitioned_parquet.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
@@ -18,25 +18,28 @@ POSTHOOK: query: insert into tbl_ice values (1, 'one', 50),
(2, 'two', 51), (3,
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a
= 22
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or
a = 22
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed 50
2 two 51
3 three 52
+4 Changed 53
5 five 54
+111 Changed 55
333 two 56
PREHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
801)
PREHOOK: type: QUERY
@@ -46,26 +49,33 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola',
800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
-PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from
tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
+POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a
from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
333 two 56
444 hola 800
+555 Changed again 801
PREHOOK: query: drop table if exists tbl_ice_other
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_ice_other
@@ -86,25 +96,33 @@ POSTHOOK: query: insert into tbl_ice_other values (10,
'ten'), (333, 'hundred')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice_other
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a
from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_ice_other
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select
t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_ice_other
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
+333 Changed forever 56
444 hola 800
+555 Changed again 801
PREHOOK: query: drop table if exists tbl_standard_other
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_standard_other
@@ -127,22 +145,30 @@ POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_standard_other
POSTHOOK: Lineage: tbl_standard_other.a SCRIPT []
POSTHOOK: Lineage: tbl_standard_other.b SCRIPT []
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_standard_other
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_standard_other
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select count(*) from tbl_ice
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select count(*) from tbl_ice
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
-0
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
+333 Changed forever 56
+444 The last one 800
+555 Changed again 801
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
similarity index 71%
copy from
iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
copy to
iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
index e0355ce680..1b0d9d5bcd 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/delete_iceberg_unpartitioned_parquet.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
@@ -18,25 +18,28 @@ POSTHOOK: query: insert into tbl_ice values (1, 'one', 50),
(2, 'two', 51), (3,
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a
= 22
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where b in ('one', 'four') or a = 22
+POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or
a = 22
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed 50
2 two 51
3 three 52
+4 Changed 53
5 five 54
+111 Changed 55
333 two 56
PREHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
801)
PREHOOK: type: QUERY
@@ -46,26 +49,33 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola',
800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[57][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
-PREHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
+Warning: Shuffle Join MERGEJOIN[51][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1, $hdt$_2,
$hdt$_3]] in Stage 'Reducer 4' is a cross product
+PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from
tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select a from tbl_ice where a
<= 5) or c in (select c from tbl_ice where c > 800)
+POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a
from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
333 two 56
444 hola 800
+555 Changed again 801
PREHOOK: query: drop table if exists tbl_ice_other
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_ice_other
@@ -86,25 +96,33 @@ POSTHOOK: query: insert into tbl_ice_other values (10,
'ten'), (333, 'hundred')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice_other
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a
from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_ice_other
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_ice_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select
t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_ice_other
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select * from tbl_ice order by a
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_ice order by a
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
+333 Changed forever 56
444 hola 800
+555 Changed again 801
PREHOOK: query: drop table if exists tbl_standard_other
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_standard_other
@@ -127,22 +145,30 @@ POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_standard_other
POSTHOOK: Lineage: tbl_standard_other.a SCRIPT []
POSTHOOK: Lineage: tbl_standard_other.b SCRIPT []
-PREHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a)
+PREHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_standard_other
PREHOOK: Output: default@tbl_ice
-POSTHOOK: query: delete from tbl_ice where a in (select t1.a from tbl_ice t1
join tbl_standard_other t2 on t1.a = t2.a)
+POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a
from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_standard_other
POSTHOOK: Output: default@tbl_ice
-PREHOOK: query: select count(*) from tbl_ice
+PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select count(*) from tbl_ice
+POSTHOOK: query: select * from tbl_ice order by a, b, c
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
-0
+1 Changed again 50
+2 Changed again 51
+3 Changed again 52
+4 Changed again 53
+5 Changed again 54
+111 Changed 55
+333 Changed forever 56
+444 The last one 800
+555 Changed again 801
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 3a83855016..70fa42bad6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -50,6 +50,7 @@ import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -98,6 +99,7 @@ import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
@@ -3330,6 +3332,23 @@ public class AcidUtils {
table.getStorageHandler().supportsAcidOperations() !=
HiveStorageHandler.AcidSupportType.NONE;
}
+ /**
+ * Returns the virtual columns needed for update queries. For ACID queries
it is a single ROW__ID, for non-native
+ * tables the list is provided by the {@link
HiveStorageHandler#acidVirtualColumns()}.
+ * @param table The table for which we run the query
+ * @return The list of virtual columns used
+ */
+ public static List<VirtualColumn> getAcidVirtualColumns(Table table) {
+ if (isTransactionalTable(table)) {
+ return Lists.newArrayList(VirtualColumn.ROWID);
+ } else {
+ if (isNonNativeAcidTable(table)) {
+ return table.getStorageHandler().acidVirtualColumns();
+ }
+ }
+ return Collections.emptyList();
+ }
+
public static boolean acidTableWithoutTransactions(Table table) {
return table != null && table.getStorageHandler() != null &&
table.getStorageHandler().supportsAcidOperations() ==
HiveStorageHandler.AcidSupportType.WITHOUT_TRANSACTIONS;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 0784fe65ad..8d75db400b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
@@ -307,8 +308,11 @@ public interface HiveStorageHandler extends Configurable {
/**
* {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer}
rewrites DELETE/UPDATE queries into INSERT
- * queries. E.g. DELETE FROM T WHERE A = 32 is rewritten into
+ * queries.
+ * - DELETE FROM T WHERE A = 32 is rewritten into
* INSERT INTO T SELECT <selectCols> FROM T WHERE A = 32 SORT BY
<sortCols>.
+ * - UPDATE T SET B=12 WHERE A = 32 is rewritten into
+ * INSERT INTO T SELECT <selectCols>, <newValues> FROM T WHERE A
= 32 SORT BY <sortCols>.
*
* This method specifies which columns should be injected into the
<selectCols> part of the rewritten query.
*
@@ -316,9 +320,10 @@ public interface HiveStorageHandler extends Configurable {
* other NONE.
*
* @param table the table which is being deleted/updated/merged into
+ * @param operation the operation type we are executing
* @return the list of columns that should be projected in the rewritten
ACID query
*/
- default List<FieldSchema>
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
+ default List<FieldSchema>
acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation
operation) {
return Collections.emptyList();
}
@@ -333,9 +338,10 @@ public interface HiveStorageHandler extends Configurable {
* other NONE.
*
* @param table the table which is being deleted/updated/merged into
+ * @param operation the operation type we are executing
* @return the list of columns that should be used as sort columns in the
rewritten ACID query
*/
- default List<FieldSchema>
acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
+ default List<FieldSchema>
acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation
operation) {
return Collections.emptyList();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 22d6d63e04..9003984567 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -53,6 +53,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -6912,7 +6913,8 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab,
table_desc, input, false);
}
} else {
- if (updating(dest) || deleting(dest)) {
+ // Non-native acid tables should handle their own bucketing for
updates/deletes
+ if ((updating(dest) || deleting(dest)) &&
!AcidUtils.isNonNativeAcidTable(dest_tab)) {
partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
enforceBucketing = true;
}
@@ -7320,7 +7322,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
input = genConstraintsPlan(dest, qb, input);
if (!qb.getIsQuery()) {
- input = genConversionSelectOperator(dest, qb, input,
destinationTable.getDeserializer(), dpCtx, parts);
+ input = genConversionSelectOperator(dest, qb, input,
destinationTable.getDeserializer(), dpCtx, parts, destinationTable);
}
if (destinationTable.isMaterializedView() &&
@@ -7467,7 +7469,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
input = genConstraintsPlan(dest, qb, input);
if (!qb.getIsQuery()) {
- input = genConversionSelectOperator(dest, qb, input,
destinationTable.getDeserializer(), dpCtx, null);
+ input = genConversionSelectOperator(dest, qb, input,
destinationTable.getDeserializer(), dpCtx, null, destinationTable);
}
if (destinationTable.isMaterializedView() &&
@@ -8437,7 +8439,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
* types that are expected by the table_desc.
*/
private Operator genConversionSelectOperator(String dest, QB qb, Operator
input,
- Deserializer deserializer, DynamicPartitionCtx dpCtx, List<FieldSchema>
parts)
+ Deserializer deserializer, DynamicPartitionCtx dpCtx, List<FieldSchema>
parts, Table table)
throws SemanticException {
StructObjectInspector oi = null;
try {
@@ -8466,7 +8468,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
}
// Check column types
- boolean converted = false;
+ AtomicBoolean converted = new AtomicBoolean(false);
int columnNumber = tableFields.size();
List<ExprNodeDesc> expressions = new ArrayList<ExprNodeDesc>(columnNumber);
@@ -8474,52 +8476,33 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
// does the conversion to String by itself.
if (!(deserializer instanceof MetadataTypedColumnsetSerDe) &&
!deleting(dest)) {
- // If we're updating, add the ROW__ID expression, then make the
following column accesses
- // offset by 1 so that we don't try to convert the ROW__ID
- if (updating(dest)) {
- expressions.add(new ExprNodeColumnDesc(rowFields.get(0).getType(),
- rowFields.get(0).getInternalName(), "", true));
+ // If we're updating, add the required virtual columns.
+ int virtualColumnSize = updating(dest) ?
AcidUtils.getAcidVirtualColumns(table).size() : 0;
+ for (int i = 0; i < virtualColumnSize; i++) {
+ expressions.add(new ExprNodeColumnDesc(rowFields.get(i).getType(),
+ rowFields.get(i).getInternalName(), "", true));
}
// here only deals with non-partition columns. We deal with partition
columns next
+ int rowFieldsOffset = expressions.size();
for (int i = 0; i < columnNumber; i++) {
- int rowFieldsOffset = updating(dest) ? i + 1 : i;
- ObjectInspector tableFieldOI = tableFields.get(i)
- .getFieldObjectInspector();
- TypeInfo tableFieldTypeInfo = TypeInfoUtils
- .getTypeInfoFromObjectInspector(tableFieldOI);
- TypeInfo rowFieldTypeInfo = rowFields.get(rowFieldsOffset).getType();
- ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo,
- rowFields.get(rowFieldsOffset).getInternalName(), "", false,
- rowFields.get(rowFieldsOffset).isSkewedCol());
- // LazySimpleSerDe can convert any types to String type using
- // JSON-format. However, we may add more operators.
- // Thus, we still keep the conversion.
- if (!tableFieldTypeInfo.equals(rowFieldTypeInfo)) {
- // need to do some conversions here
- converted = true;
- if (tableFieldTypeInfo.getCategory() != Category.PRIMITIVE) {
- // cannot convert to complex types
- column = null;
- } else {
- column = ExprNodeTypeCheck.getExprNodeDefaultExprProcessor()
- .createConversionCast(column,
(PrimitiveTypeInfo)tableFieldTypeInfo);
- }
- if (column == null) {
- String reason = "Cannot convert column " + i + " from "
- + rowFieldTypeInfo + " to " + tableFieldTypeInfo + ".";
- throw new SemanticException(ASTErrorUtils.getMsg(
- ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(),
- qb.getParseInfo().getDestForClause(dest), reason));
- }
- }
+ ExprNodeDesc column = handleConversion(tableFields.get(i),
rowFields.get(rowFieldsOffset + i), converted, dest, i);
expressions.add(column);
}
+ // For Non-Native ACID tables we should convert the new values as well
+ rowFieldsOffset = expressions.size();
+ if (updating(dest) && AcidUtils.isNonNativeAcidTable(table)) {
+ for (int i = 0; i < columnNumber; i++) {
+ ExprNodeDesc column = handleConversion(tableFields.get(i),
rowFields.get(rowFieldsOffset + i), converted, dest, i);
+ expressions.add(column);
+ }
+ }
+
// deal with dynamic partition columns
+ rowFieldsOffset = expressions.size();
if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
// rowFields contains non-partitioned columns (tableFields) followed
by DP columns
- int rowFieldsOffset = tableFields.size() + (updating(dest) ? 1 : 0);
for (int dpColIdx = 0; dpColIdx < rowFields.size() - rowFieldsOffset;
++dpColIdx) {
// create ExprNodeDesc
@@ -8549,7 +8532,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
if (!partitionTypeInfo.equals(inputTypeInfo)) {
column = ExprNodeTypeCheck.getExprNodeDefaultExprProcessor()
.createConversionCast(column, partitionTypeInfo);
- converted = true;
+ converted.set(true);
}
} else {
LOG.warn("Partition schema for dynamic partition " +
inputColumn.getAlias() + " ("
@@ -8562,7 +8545,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
}
}
- if (converted) {
+ if (converted.get()) {
// add the select operator
RowResolver rowResolver = new RowResolver();
List<String> colNames = new ArrayList<String>();
@@ -8582,6 +8565,53 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
return input;
}
+ /**
+ * Creates an expression for converting from a table column to a row column.
For example:
+ * The table column is int but the query provides a string in the row, then
we need to cast automatically.
+ * @param tableField The target table column
+ * @param rowField The source row column
+ * @param conversion The value of this boolean is set to true if we detect
that a conversion is needed. This is a
+ * hidden return value hidden here, to notify the caller
that a cast was needed.
+ * @param dest The destination table for the error message
+ * @param columnNum The destination column id for the error message
+ * @return The Expression describing the selected column. Note that
`conversion` can be considered as a return value
+ * as well
+ * @throws SemanticException If conversion were needed, but automatic
conversion is not available
+ */
+ private ExprNodeDesc handleConversion(StructField tableField, ColumnInfo
rowField, AtomicBoolean conversion, String dest, int columnNum)
+ throws SemanticException {
+ ObjectInspector tableFieldOI = tableField
+ .getFieldObjectInspector();
+ TypeInfo tableFieldTypeInfo = TypeInfoUtils
+ .getTypeInfoFromObjectInspector(tableFieldOI);
+ TypeInfo rowFieldTypeInfo = rowField.getType();
+ ExprNodeDesc column = new ExprNodeColumnDesc(rowFieldTypeInfo,
+ rowField.getInternalName(), "", false,
+ rowField.isSkewedCol());
+ // LazySimpleSerDe can convert any types to String type using
+ // JSON-format. However, we may add more operators.
+ // Thus, we still keep the conversion.
+ if (!tableFieldTypeInfo.equals(rowFieldTypeInfo)) {
+ // need to do some conversions here
+ conversion.set(true);
+ if (tableFieldTypeInfo.getCategory() != Category.PRIMITIVE) {
+ // cannot convert to complex types
+ column = null;
+ } else {
+ column = ExprNodeTypeCheck.getExprNodeDefaultExprProcessor()
+ .createConversionCast(column,
(PrimitiveTypeInfo)tableFieldTypeInfo);
+ }
+ if (column == null) {
+ String reason = "Cannot convert column " + columnNum + " from "
+ + rowFieldTypeInfo + " to " + tableFieldTypeInfo + ".";
+ throw new SemanticException(ASTErrorUtils.getMsg(
+ ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(),
+ qb.getParseInfo().getDestForClause(dest), reason));
+ }
+ }
+ return column;
+ }
+
@SuppressWarnings("nls")
private Operator genLimitPlan(String dest, Operator input, int offset, int
limit) {
// A map-only job can be optimized - instead of converting it to a
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index deff4f3c20..70121096a8 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
/**
@@ -112,12 +111,17 @@ public class UpdateDeleteSemanticAnalyzer extends
RewriteSemanticAnalyzer {
addPartitionColsToInsert(mTable.getPartCols(), rewrittenQueryStr);
boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(mTable);
+ int columnOffset;
if (nonNativeAcid) {
- String selectCols =
mTable.getStorageHandler().acidSelectColumns(mTable).stream()
- .map(FieldSchema::getName).collect(Collectors.joining(","));
+ List<FieldSchema> acidColumns =
mTable.getStorageHandler().acidSelectColumns(mTable, operation);
+ String selectCols = acidColumns.stream()
+ .map(fieldSchema ->
HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf))
+ .collect(Collectors.joining(","));
rewrittenQueryStr.append(" select ").append(selectCols);
+ columnOffset = acidColumns.size();
} else {
rewrittenQueryStr.append(" select ROW__ID");
+ columnOffset = 1;
}
Map<Integer, ASTNode> setColExprs = null;
@@ -143,7 +147,7 @@ public class UpdateDeleteSemanticAnalyzer extends
RewriteSemanticAnalyzer {
// This is one of the columns we're setting, record it's position so
we can come back
// later and patch it up.
// Add one to the index because the select has the ROW__ID as the
first column.
- setColExprs.put(i + 1, setCol);
+ setColExprs.put(columnOffset + i, setCol);
}
}
}
@@ -162,9 +166,11 @@ public class UpdateDeleteSemanticAnalyzer extends
RewriteSemanticAnalyzer {
// Add a sort by clause so that the row ids come out in the correct order
if (nonNativeAcid) {
- String sortCols =
mTable.getStorageHandler().acidSortColumns(mTable).stream()
- .map(FieldSchema::getName).collect(Collectors.joining(","));
- rewrittenQueryStr.append(" sort by ").append(sortCols).append(" ");
+ List<FieldSchema> sortColumns =
mTable.getStorageHandler().acidSortColumns(mTable, operation);
+ if (!sortColumns.isEmpty()) {
+ String sortCols =
sortColumns.stream().map(FieldSchema::getName).collect(Collectors.joining(","));
+ rewrittenQueryStr.append(" sort by ").append(sortCols).append(" ");
+ }
} else {
rewrittenQueryStr.append(" sort by ROW__ID ");
}
@@ -191,16 +197,29 @@ public class UpdateDeleteSemanticAnalyzer extends
RewriteSemanticAnalyzer {
// \-> TOK_INSERT -> TOK_INSERT_INTO
// \-> TOK_SELECT
// \-> TOK_SORTBY
+ // Or
+ // TOK_QUERY -> TOK_FROM
+ // \-> TOK_INSERT -> TOK_INSERT_INTO
+ // \-> TOK_SELECT
+ //
// The following adds the TOK_WHERE and its subtree from the original
query as a child of
// TOK_INSERT, which is where it would have landed if it had been there
originally in the
// string. We do it this way because it's easy then turning the
original AST back into a
- // string and reparsing it. We have to move the SORT_BY over one,
- // so grab it and then push it to the second slot, and put the where in
the first slot
- ASTNode sortBy = (ASTNode)rewrittenInsert.getChildren().get(2);
- assert sortBy.getToken().getType() == HiveParser.TOK_SORTBY :
- "Expected TOK_SORTBY to be first child of TOK_SELECT, but found " +
sortBy.getName();
- rewrittenInsert.addChild(sortBy);
- rewrittenInsert.setChild(2, where);
+ // string and reparsing it.
+ if (rewrittenInsert.getChildren().size() == 3) {
+ // We have to move the SORT_BY over one, so grab it and then push it
to the second slot,
+ // and put the where in the first slot
+ ASTNode sortBy = (ASTNode) rewrittenInsert.getChildren().get(2);
+ assert sortBy.getToken().getType() == HiveParser.TOK_SORTBY :
+ "Expected TOK_SORTBY to be third child of TOK_INSERT, but found "
+ sortBy.getName();
+ rewrittenInsert.addChild(sortBy);
+ rewrittenInsert.setChild(2, where);
+ } else {
+ ASTNode select = (ASTNode) rewrittenInsert.getChildren().get(1);
+ assert select.getToken().getType() == HiveParser.TOK_SELECT :
+ "Expected TOK_SELECT to be second child of TOK_INSERT, but found "
+ select.getName();
+ rewrittenInsert.addChild(where);
+ }
}
// Patch up the projection list for updates, putting back the original set
expressions.