This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9680c30edcb HIVE-27031: Iceberg: Implement Copy-On-Write for Delete
Queries. (#4026). (Ayush Saxena, reviewed by Denys Kuzmenko)
9680c30edcb is described below
commit 9680c30edcbfa05d90d37e49cff38af690f5b8d4
Author: Ayush Saxena <[email protected]>
AuthorDate: Wed Feb 15 11:53:46 2023 +0530
HIVE-27031: Iceberg: Implement Copy-On-Write for Delete Queries. (#4026).
(Ayush Saxena, reviewed by Denys Kuzmenko)
---
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 6 +-
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 23 ++-
.../iceberg/mr/hive/TestHiveIcebergInserts.java | 6 +-
.../apache/iceberg/mr/hive/TestHiveIcebergV2.java | 2 +-
.../test/queries/positive/iceberg_copy_on_write.q | 63 ++++++
.../results/positive/iceberg_copy_on_write.q.out | 219 +++++++++++++++++++++
.../hive/ql/metadata/HiveStorageHandler.java | 10 +
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 65 +++++-
8 files changed, 373 insertions(+), 21 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 6b42add1b4f..174022e5ea2 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -918,9 +918,9 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
// tables that don't have this (the default is copy-on-write). We set this
at table creation and v1->v2 conversion.
if ((icebergTable == null || ((BaseTable)
icebergTable).operations().current().formatVersion() == 1) &&
"2".equals(newProps.get(TableProperties.FORMAT_VERSION))) {
- newProps.put(TableProperties.DELETE_MODE, "merge-on-read");
- newProps.put(TableProperties.UPDATE_MODE, "merge-on-read");
- newProps.put(TableProperties.MERGE_MODE, "merge-on-read");
+ newProps.put(TableProperties.DELETE_MODE,
HiveIcebergStorageHandler.MERGE_ON_READ);
+ newProps.put(TableProperties.UPDATE_MODE,
HiveIcebergStorageHandler.MERGE_ON_READ);
+ newProps.put(TableProperties.MERGE_MODE,
HiveIcebergStorageHandler.MERGE_ON_READ);
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 74c123f48d3..d3ccd7b84ee 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -47,6 +47,7 @@ import
org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
@@ -132,7 +133,8 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
private static final String TABLE_NAME_SEPARATOR = "..";
private static final String ICEBERG = "iceberg";
private static final String PUFFIN = "puffin";
-
+ public static final String COPY_ON_WRITE = "copy-on-write";
+ public static final String MERGE_ON_READ = "merge-on-read";
/**
* Function template for producing a custom sort expression function:
* Takes the source column index and the bucket count to creat a function
where Iceberg bucket UDF is used to build
@@ -640,7 +642,6 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
// TODO: remove the checks as copy-on-write mode implementation for these
DML ops get added
private static void
checkDMLOperationMode(org.apache.hadoop.hive.ql.metadata.Table table) {
Map<String, String> opTypes = ImmutableMap.of(
- TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT,
TableProperties.MERGE_MODE, TableProperties.MERGE_MODE_DEFAULT,
TableProperties.UPDATE_MODE, TableProperties.UPDATE_MODE_DEFAULT);
@@ -1131,9 +1132,9 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
String formatVersion = origParams.get(TableProperties.FORMAT_VERSION);
if ("2".equals(formatVersion)) {
tbl.getParameters().put(TableProperties.FORMAT_VERSION, formatVersion);
- tbl.getParameters().put(TableProperties.DELETE_MODE, "merge-on-read");
- tbl.getParameters().put(TableProperties.UPDATE_MODE, "merge-on-read");
- tbl.getParameters().put(TableProperties.MERGE_MODE, "merge-on-read");
+ tbl.getParameters().put(TableProperties.DELETE_MODE, MERGE_ON_READ);
+ tbl.getParameters().put(TableProperties.UPDATE_MODE, MERGE_ON_READ);
+ tbl.getParameters().put(TableProperties.MERGE_MODE, MERGE_ON_READ);
}
// check if the table is being created as managed table, in that case we
translate it to external
@@ -1166,4 +1167,16 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
props.put(InputFormatConfig.PARTITION_SPEC,
PartitionSpecParser.toJson(origTable.spec()));
return props;
}
+
+ @Override
+ public boolean shouldOverwrite(org.apache.hadoop.hive.ql.metadata.Table
mTable, String operationName) {
+ String mode = null;
+ String formatVersion =
mTable.getTTable().getParameters().get(TableProperties.FORMAT_VERSION);
+ // As of now only delete mode is supported, for all others return false
+ if ("2".equals(formatVersion) &&
operationName.equalsIgnoreCase(Context.Operation.DELETE.toString())) {
+ mode = mTable.getTTable().getParameters()
+ .getOrDefault(TableProperties.DELETE_MODE,
TableProperties.DELETE_MODE_DEFAULT);
+ }
+ return COPY_ON_WRITE.equalsIgnoreCase(mode);
+ }
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 3fd2c240fe7..74cc5c90159 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -550,9 +550,9 @@ public class TestHiveIcebergInserts extends
HiveIcebergStorageHandlerWithEngineB
// Create a V2 table with merge-on-read.
TableIdentifier target = TableIdentifier.of("default", "target");
Map<String, String> opTypes = ImmutableMap.of(
- TableProperties.DELETE_MODE, "merge-on-read",
- TableProperties.MERGE_MODE, "merge-on-read",
- TableProperties.UPDATE_MODE, "merge-on-read");
+ TableProperties.DELETE_MODE, HiveIcebergStorageHandler.MERGE_ON_READ,
+ TableProperties.MERGE_MODE, HiveIcebergStorageHandler.MERGE_ON_READ,
+ TableProperties.UPDATE_MODE, HiveIcebergStorageHandler.MERGE_ON_READ);
Table table = testTables.createTable(shell, target.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2,
opTypes);
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
index 054b5f0e3e0..c2a84c12918 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
@@ -241,7 +241,7 @@ public class TestHiveIcebergV2 extends
HiveIcebergStorageHandlerWithEngineBase {
PartitionSpec.unpartitioned(), fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
// verify delete mode set to merge-on-read
- Assert.assertEquals("merge-on-read",
+ Assert.assertEquals(HiveIcebergStorageHandler.MERGE_ON_READ,
shell.metastore().getTable("default", "customers")
.getParameters().get(TableProperties.DELETE_MODE));
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_copy_on_write.q
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_copy_on_write.q
new file mode 100644
index 00000000000..b93f1d9e5bd
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_copy_on_write.q
@@ -0,0 +1,63 @@
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask the file size values as it can have slight variability, causing test
flakiness
+--! qt:replace:/("added-files-size":")\d+/$1#FileSize#/
+--! qt:replace:/("total-files-size":")\d+/$1#FileSize#/
+--! qt:replace:/("removed-files-size":")\d+/$1#FileSize#/
+
+
+-- create an unpartitioned table with skip delete data set to false
+ create table ice01 (id int, name string) Stored by Iceberg stored as ORC
+ TBLPROPERTIES('format-version'='2');
+
+-- insert some values
+insert into ice01 values (1, 'ABC'),(2, 'CBS'),(3, null),(4, 'POPI'),(5,
'AQWR'),(6, 'POIU'),(9, null),(8,
+'POIKL'),(10, 'YUIO');
+
+-- delete using MOR
+
+delete from ice01 where id>9 OR id=8;
+
+select * from ice01;
+
+-- should be 2 files, one data file and one positional delete file.
+select summary from default.ice01.snapshots;
+
+ALTER TABLE ice01 SET TBLPROPERTIES ('write.delete.mode'='copy-on-write');
+
+-- delete some values
+explain delete from ice01 where id>4 OR id=2;
+delete from ice01 where id>4 OR id=2;
+
+select * from ice01;
+
+-- should be only one data file.
+select summary from default.ice01.snapshots;
+
+-- Null cases.
+
+delete from ice01 where null;
+
+select * from ice01;
+
+delete from ice01 where not null;
+
+select * from ice01;
+
+delete from ice01 where name=null;
+
+select * from ice01;
+
+delete from ice01 where name!=null;
+
+select * from ice01;
+
+--disable cbo due to HIVE-27070
+set hive.cbo.enable=false;
+
+delete from ice01 where name is null;
+
+select * from ice01;
+
+-- clean up
+drop table if exists ice01;
\ No newline at end of file
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_copy_on_write.q.out
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_copy_on_write.q.out
new file mode 100644
index 00000000000..631812392e6
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_copy_on_write.q.out
@@ -0,0 +1,219 @@
+PREHOOK: query: create table ice01 (id int, name string) Stored by Iceberg
stored as ORC
+ TBLPROPERTIES('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice01
+POSTHOOK: query: create table ice01 (id int, name string) Stored by Iceberg
stored as ORC
+ TBLPROPERTIES('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice01
+PREHOOK: query: insert into ice01 values (1, 'ABC'),(2, 'CBS'),(3, null),(4,
'POPI'),(5, 'AQWR'),(6, 'POIU'),(9, null),(8,
+'POIKL'),(10, 'YUIO')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice01
+POSTHOOK: query: insert into ice01 values (1, 'ABC'),(2, 'CBS'),(3, null),(4,
'POPI'),(5, 'AQWR'),(6, 'POIU'),(9, null),(8,
+'POIKL'),(10, 'YUIO')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice01
+PREHOOK: query: delete from ice01 where id>9 OR id=8
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: delete from ice01 where id>9 OR id=8
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from ice01
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from ice01
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 ABC
+2 CBS
+3 NULL
+4 POPI
+5 AQWR
+6 POIU
+9 NULL
+PREHOOK: query: select summary from default.ice01.snapshots
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select summary from default.ice01.snapshots
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+{"added-data-files":"1","added-records":"9","added-files-size":"#FileSize#","changed-partition-count":"1","total-records":"9","total-files-size":"#FileSize#","total-data-files":"1","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"}
+{"added-position-delete-files":"1","added-delete-files":"1","added-files-size":"#FileSize#","added-position-deletes":"2","changed-partition-count":"1","total-records":"9","total-files-size":"#FileSize#","total-data-files":"1","total-delete-files":"1","total-position-deletes":"2","total-equality-deletes":"0"}
+PREHOOK: query: ALTER TABLE ice01 SET TBLPROPERTIES
('write.delete.mode'='copy-on-write')
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: ALTER TABLE ice01 SET TBLPROPERTIES
('write.delete.mode'='copy-on-write')
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: explain delete from ice01 where id>4 OR id=2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: explain delete from ice01 where id>4 OR id=2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+Stage-3
+ Stats Work{}
+ Stage-0
+ Move Operator
+ table:{"name:":"default.ice01"}
+ Stage-2
+ Dependency Collection{}
+ Stage-1
+ Map 1 vectorized
+ File Output Operator [FS_7]
+ table:{"name:":"default.ice01"}
+ Select Operator [SEL_6] (rows=7 width=78)
+ Output:["_col0","_col1"]
+ Filter Operator [FIL_5] (rows=7 width=78)
+ predicate:(((id <= 4) and (id <> 2)) or ((id > 4) or (id =
2)) is null)
+ TableScan [TS_0] (rows=9 width=81)
+
default@ice01,ice01,Tbl:COMPLETE,Col:COMPLETE,Output:["id","name"]
+
+PREHOOK: query: delete from ice01 where id>4 OR id=2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: delete from ice01 where id>4 OR id=2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from ice01
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from ice01
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 ABC
+3 NULL
+4 POPI
+PREHOOK: query: select summary from default.ice01.snapshots
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select summary from default.ice01.snapshots
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+{"added-data-files":"1","added-records":"9","added-files-size":"#FileSize#","changed-partition-count":"1","total-records":"9","total-files-size":"#FileSize#","total-data-files":"1","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"}
+{"added-position-delete-files":"1","added-delete-files":"1","added-files-size":"#FileSize#","added-position-deletes":"2","changed-partition-count":"1","total-records":"9","total-files-size":"#FileSize#","total-data-files":"1","total-delete-files":"1","total-position-deletes":"2","total-equality-deletes":"0"}
+{"replace-partitions":"true","added-data-files":"1","deleted-data-files":"1","removed-position-delete-files":"1","removed-delete-files":"1","added-records":"3","deleted-records":"9","added-files-size":"#FileSize#","removed-files-size":"#FileSize#","removed-position-deletes":"2","changed-partition-count":"1","total-records":"3","total-files-size":"#FileSize#","total-data-files":"1","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"}
+PREHOOK: query: delete from ice01 where null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: delete from ice01 where null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from ice01
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from ice01
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 ABC
+3 NULL
+4 POPI
+PREHOOK: query: delete from ice01 where not null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: delete from ice01 where not null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from ice01
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from ice01
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 ABC
+3 NULL
+4 POPI
+PREHOOK: query: delete from ice01 where name=null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: delete from ice01 where name=null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from ice01
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from ice01
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 ABC
+3 NULL
+4 POPI
+PREHOOK: query: delete from ice01 where name!=null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: delete from ice01 where name!=null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from ice01
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from ice01
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 ABC
+3 NULL
+4 POPI
+PREHOOK: query: delete from ice01 where name is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: delete from ice01 where name is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from ice01
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from ice01
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 ABC
+4 POPI
+PREHOOK: query: drop table if exists ice01
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: drop table if exists ice01
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 7f765270c03..44dfae08167 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -291,6 +291,16 @@ public interface HiveStorageHandler extends Configurable {
return new HashMap<>();
}
+ /**
+ * Returns whether the data should be overwritten for the specific operation.
+ * @param mTable the table.
+ * @param operationName operationName of the operation.
+ * @return if the data should be overwritten for the specified operation.
+ */
+ default boolean shouldOverwrite(org.apache.hadoop.hive.ql.metadata.Table
mTable, String operationName) {
+ return false;
+ }
+
enum AcidSupportType {
NONE,
WITH_TRANSACTIONS,
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index a1304a87c2f..be472df533b 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -23,12 +23,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.antlr.runtime.CommonToken;
+import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -97,16 +100,31 @@ public class UpdateDeleteSemanticAnalyzer extends
RewriteSemanticAnalyzer {
private void reparseAndSuperAnalyze(ASTNode tree, Table mTable, ASTNode
tabNameNode) throws SemanticException {
List<? extends Node> children = tree.getChildren();
+ boolean shouldOverwrite = false;
+ HiveStorageHandler storageHandler = mTable.getStorageHandler();
+ if (storageHandler != null) {
+ shouldOverwrite = storageHandler.shouldOverwrite(mTable,
operation.name());
+ }
+
+
StringBuilder rewrittenQueryStr = new StringBuilder();
- rewrittenQueryStr.append("insert into table ");
+ if (shouldOverwrite) {
+ rewrittenQueryStr.append("insert overwrite table ");
+ } else {
+ rewrittenQueryStr.append("insert into table ");
+ }
rewrittenQueryStr.append(getFullTableNameForSQL(tabNameNode));
addPartitionColsToInsert(mTable.getPartCols(), rewrittenQueryStr);
ColumnAppender columnAppender = getColumnAppender(null);
int columnOffset = columnAppender.getDeleteValues(operation).size();
- rewrittenQueryStr.append(" select ");
- columnAppender.appendAcidSelectColumns(rewrittenQueryStr, operation);
- rewrittenQueryStr.setLength(rewrittenQueryStr.length() - 1);
+ if (!shouldOverwrite) {
+ rewrittenQueryStr.append(" select ");
+ columnAppender.appendAcidSelectColumns(rewrittenQueryStr, operation);
+ rewrittenQueryStr.setLength(rewrittenQueryStr.length() - 1);
+ } else {
+ rewrittenQueryStr.append(" select * ");
+ }
Map<Integer, ASTNode> setColExprs = null;
Map<String, ASTNode> setCols = null;
@@ -145,11 +163,35 @@ public class UpdateDeleteSemanticAnalyzer extends
RewriteSemanticAnalyzer {
where = (ASTNode)children.get(whereIndex);
assert where.getToken().getType() == HiveParser.TOK_WHERE :
"Expected where clause, but found " + where.getName();
- }
- // Add a sort by clause so that the row ids come out in the correct order
- appendSortBy(rewrittenQueryStr, columnAppender.getSortKeys());
+ if (shouldOverwrite) {
+ if (where.getChildCount() == 1) {
+
+ // Add isNull check for the where clause condition, since null is
treated as false in where condition and
+ // not null also resolves to false, so we need to explicitly handle
this case.
+ ASTNode isNullFuncNodeExpr = new ASTNode(new
CommonToken(HiveParser.TOK_FUNCTION, "TOK_FUNCTION"));
+ isNullFuncNodeExpr.addChild(new ASTNode(new
CommonToken(HiveParser.Identifier, "isNull")));
+ isNullFuncNodeExpr.addChild(where.getChild(0));
+
+ ASTNode orNodeExpr = new ASTNode(new CommonToken(HiveParser.KW_OR,
"OR"));
+ orNodeExpr.addChild(isNullFuncNodeExpr);
+ // Add the inverted where clause condition, since we want to hold
the records which doesn't satisfy this
+ // condition.
+ ASTNode notNodeExpr = new ASTNode(new CommonToken(HiveParser.KW_NOT,
"!"));
+ notNodeExpr.addChild(where.getChild(0));
+ orNodeExpr.addChild(notNodeExpr);
+ where.setChild(0, orNodeExpr);
+ } else if (where.getChildCount() > 1) {
+ throw new SemanticException("Overwrite mode not supported with more
than 1 children in where clause.");
+ }
+ }
+ }
+
+ if (!shouldOverwrite) {
+ // Add a sort by clause so that the row ids come out in the correct order
+ appendSortBy(rewrittenQueryStr, columnAppender.getSortKeys());
+ }
ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
Context rewrittenCtx = rr.rewrittenCtx;
ASTNode rewrittenTree = rr.rewrittenTree;
@@ -162,8 +204,13 @@ public class UpdateDeleteSemanticAnalyzer extends
RewriteSemanticAnalyzer {
rewrittenCtx.setOperation(Context.Operation.UPDATE);
rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.UPDATE);
} else if (deleting()) {
- rewrittenCtx.setOperation(Context.Operation.DELETE);
- rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.DELETE);
+ if (shouldOverwrite) {
+ // We are now actually executing an Insert query, so set the modes
accordingly.
+ rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.INSERT);
+ } else {
+ rewrittenCtx.setOperation(Context.Operation.DELETE);
+ rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.DELETE);
+ }
}
if (where != null) {