This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6a8e20a5836 HIVE-26811: SharedWorkOptimizer: take the union of virtual
columns in mergeable TableScans (Denys Kuzmenko, reviewed by Krisztian Kasa)
6a8e20a5836 is described below
commit 6a8e20a5836519e33e2eb7b7f71a250f09874eeb
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Mon Dec 12 10:23:00 2022 +0200
HIVE-26811: SharedWorkOptimizer: take the union of virtual columns in
mergeable TableScans (Denys Kuzmenko, reviewed by Krisztian Kasa)
Closes #3837
---
.../test/queries/positive/iceberg_merge_schema.q | 70 +++++++++
.../results/positive/iceberg_merge_schema.q.out | 175 +++++++++++++++++++++
.../hive/ql/optimizer/SharedWorkOptimizer.java | 8 +-
3 files changed, 252 insertions(+), 1 deletion(-)
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q
new file mode 100644
index 00000000000..8b5bb00dbad
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q
@@ -0,0 +1,70 @@
+-- SORT_QUERY_RESULTS
+set hive.optimize.shared.work.merge.ts.schema=true;
+
+CREATE EXTERNAL TABLE calls (
+ s_key bigint,
+ year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2');
+
+INSERT INTO calls (s_key, year) VALUES (1090969, 2022);
+
+
+CREATE EXTERNAL TABLE display (
+ skey bigint,
+ hierarchy_number string,
+ hierarchy_name string,
+ language_id int,
+ hierarchy_display string,
+ orderby string
+)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2');
+
+INSERT INTO display (skey, language_id, hierarchy_display) VALUES
+ (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+ (1090969, 3,
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1');
+
+
+MERGE INTO display USING (
+ SELECT distinct display_skey, display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub1
+
+ UNION ALL
+
+ SELECT distinct display_skey, null as display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub2
+) sub
+ON display.skey = sub.display_skey
+ and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+ UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+ INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3,
concat(sub.orig_display, '-mergenew1'));
+
+
+SELECT * FROM display;
+
+-- clean up
+DROP TABLE calls;
+DROP TABLE display;
\ No newline at end of file
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out
new file mode 100644
index 00000000000..537c4472966
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out
@@ -0,0 +1,175 @@
+PREHOOK: query: CREATE EXTERNAL TABLE calls (
+ s_key bigint,
+ year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@calls
+POSTHOOK: query: CREATE EXTERNAL TABLE calls (
+ s_key bigint,
+ year int
+) PARTITIONED BY SPEC (year)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@calls
+PREHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@calls
+POSTHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@calls
+PREHOOK: query: CREATE EXTERNAL TABLE display (
+ skey bigint,
+ hierarchy_number string,
+ hierarchy_name string,
+ language_id int,
+ hierarchy_display string,
+ orderby string
+)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@display
+POSTHOOK: query: CREATE EXTERNAL TABLE display (
+ skey bigint,
+ hierarchy_number string,
+ hierarchy_name string,
+ language_id int,
+ hierarchy_display string,
+ orderby string
+)
+STORED BY Iceberg STORED AS parquet
+TBLPROPERTIES ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@display
+PREHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display)
VALUES
+ (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+ (1090969, 3,
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@display
+POSTHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display)
VALUES
+ (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+ (1090969, 3,
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@display
+Warning: Shuffle Join MERGEJOIN[62][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[63][tables = [$hdt$_0, $hdt$_1]] in Stage
'Reducer 8' is a cross product
+PREHOOK: query: MERGE INTO display USING (
+ SELECT distinct display_skey, display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub1
+
+ UNION ALL
+
+ SELECT distinct display_skey, null as display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub2
+) sub
+ON display.skey = sub.display_skey
+ and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+ UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+ INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3,
concat(sub.orig_display, '-mergenew1'))
+PREHOOK: type: QUERY
+PREHOOK: Input: default@calls
+PREHOOK: Input: default@display
+PREHOOK: Output: default@display
+PREHOOK: Output: default@display
+PREHOOK: Output: default@merge_tmp_table
+POSTHOOK: query: MERGE INTO display USING (
+ SELECT distinct display_skey, display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub1
+
+ UNION ALL
+
+ SELECT distinct display_skey, null as display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub2
+) sub
+ON display.skey = sub.display_skey
+ and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+ UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+ INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3,
concat(sub.orig_display, '-mergenew1'))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@calls
+POSTHOOK: Input: default@display
+POSTHOOK: Output: default@display
+POSTHOOK: Output: default@display
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(display)display.null, ]
+PREHOOK: query: SELECT * FROM display
+PREHOOK: type: QUERY
+PREHOOK: Input: default@display
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM display
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@display
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1090969 NULL NULL 3
f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1-mergenew1
NULL
+1090969 NULL NULL 3
f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1-mergeupdated1
NULL
+1090969 NULL NULL 3
f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-mergenew1 NULL
+1090969 NULL NULL 3
f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-mergeupdated1 NULL
+PREHOOK: query: DROP TABLE calls
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@calls
+PREHOOK: Output: default@calls
+POSTHOOK: query: DROP TABLE calls
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@calls
+POSTHOOK: Output: default@calls
+PREHOOK: query: DROP TABLE display
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@display
+PREHOOK: Output: default@display
+POSTHOOK: query: DROP TABLE display
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@display
+POSTHOOK: Output: default@display
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
index 7dc64e71d25..63a2cf0b649 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.graph.OperatorGraph;
import org.apache.hadoop.hive.ql.optimizer.graph.OperatorGraph.Cluster;
import org.apache.hadoop.hive.ql.optimizer.graph.OperatorGraph.EdgeType;
@@ -215,7 +216,7 @@ public class SharedWorkOptimizer extends Transform {
}
-
if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) {
+ if
(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) {
// Try to reuse cache for broadcast side in mapjoin operators that
// share same input.
// First we group together all the mapjoin operators that share same
@@ -680,6 +681,11 @@ public class SharedWorkOptimizer extends Transform {
retainableTsOp.getConf().getNeededColumns().add(col);
}
}
+ for (VirtualColumn col : discardableTsOp.getConf().getVirtualCols()) {
+ if (!retainableTsOp.getConf().getVirtualCols().contains(col)) {
+ retainableTsOp.getConf().getVirtualCols().add(col);
+ }
+ }
}
private static boolean compatibleSchema(TableScanOperator tsOp1,
TableScanOperator tsOp2) {