This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8e20a5836 HIVE-26811: SharedWorkOptimizer: take the union of virtual 
columns in mergeable TableScans (Denys Kuzmenko, reviewed by Krisztian Kasa)
6a8e20a5836 is described below

commit 6a8e20a5836519e33e2eb7b7f71a250f09874eeb
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Mon Dec 12 10:23:00 2022 +0200

    HIVE-26811: SharedWorkOptimizer: take the union of virtual columns in 
mergeable TableScans (Denys Kuzmenko, reviewed by Krisztian Kasa)
    
    Closes #3837
---
 .../test/queries/positive/iceberg_merge_schema.q   |  70 +++++++++
 .../results/positive/iceberg_merge_schema.q.out    | 175 +++++++++++++++++++++
 .../hive/ql/optimizer/SharedWorkOptimizer.java     |   8 +-
 3 files changed, 252 insertions(+), 1 deletion(-)

diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q
new file mode 100644
index 00000000000..8b5bb00dbad
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q
@@ -0,0 +1,70 @@
+-- SORT_QUERY_RESULTS
+set hive.optimize.shared.work.merge.ts.schema=true;
+
+CREATE EXTERNAL TABLE calls (
+  s_key bigint, 
+  year int
+) PARTITIONED BY SPEC (year)  
+STORED BY Iceberg STORED AS parquet 
+TBLPROPERTIES ('format-version'='2');
+  
+INSERT INTO calls (s_key, year) VALUES (1090969, 2022);
+
+
+CREATE EXTERNAL TABLE display (                   
+  skey bigint,                                   
+  hierarchy_number string,                       
+  hierarchy_name string,                         
+  language_id int,                               
+  hierarchy_display string,                      
+  orderby string
+)                                
+STORED BY Iceberg STORED AS parquet 
+TBLPROPERTIES ('format-version'='2'); 
+
+INSERT INTO display (skey, language_id, hierarchy_display) VALUES 
+  (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+  (1090969, 3, 
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1');
+
+
+MERGE INTO display USING (
+  SELECT distinct display_skey, display, display as orig_display 
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM ( 
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R 
+    INNER JOIN display D 
+      ON R.s_key = D.skey AND D.language_id = 3 
+    GROUP BY D.skey, 
+      D.hierarchy_display
+  ) sub1 
+
+  UNION ALL 
+  
+  SELECT distinct display_skey, null as display, display as orig_display 
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM ( 
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R 
+    INNER JOIN display D 
+      ON R.s_key = D.skey AND D.language_id = 3 
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub2
+) sub 
+ON display.skey = sub.display_skey 
+    and display.hierarchy_display = sub.display 
+
+WHEN MATCHED THEN 
+  UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') 
+WHEN NOT MATCHED THEN 
+  INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, 
concat(sub.orig_display, '-mergenew1'));
+
+
+SELECT * FROM display;
+    
+-- clean up    
+DROP TABLE calls;
+DROP TABLE display;
\ No newline at end of file
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out
new file mode 100644
index 00000000000..537c4472966
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out
@@ -0,0 +1,175 @@
+PREHOOK: query: CREATE EXTERNAL TABLE calls (
+  s_key bigint, 
+  year int
+) PARTITIONED BY SPEC (year)  
+STORED BY Iceberg STORED AS parquet 
+TBLPROPERTIES ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@calls
+POSTHOOK: query: CREATE EXTERNAL TABLE calls (
+  s_key bigint, 
+  year int
+) PARTITIONED BY SPEC (year)  
+STORED BY Iceberg STORED AS parquet 
+TBLPROPERTIES ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@calls
+PREHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@calls
+POSTHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@calls
+PREHOOK: query: CREATE EXTERNAL TABLE display (                   
+  skey bigint,                                   
+  hierarchy_number string,                       
+  hierarchy_name string,                         
+  language_id int,                               
+  hierarchy_display string,                      
+  orderby string
+)                                
+STORED BY Iceberg STORED AS parquet 
+TBLPROPERTIES ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@display
+POSTHOOK: query: CREATE EXTERNAL TABLE display (                   
+  skey bigint,                                   
+  hierarchy_number string,                       
+  hierarchy_name string,                         
+  language_id int,                               
+  hierarchy_display string,                      
+  orderby string
+)                                
+STORED BY Iceberg STORED AS parquet 
+TBLPROPERTIES ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@display
+PREHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) 
VALUES 
+  (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+  (1090969, 3, 
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@display
+POSTHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) 
VALUES 
+  (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
+  (1090969, 3, 
'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@display
+Warning: Shuffle Join MERGEJOIN[62][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[63][tables = [$hdt$_0, $hdt$_1]] in Stage 
'Reducer 8' is a cross product
+PREHOOK: query: MERGE INTO display USING (
+  SELECT distinct display_skey, display, display as orig_display 
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM ( 
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R 
+    INNER JOIN display D 
+      ON R.s_key = D.skey AND D.language_id = 3 
+    GROUP BY D.skey, 
+      D.hierarchy_display
+  ) sub1 
+
+  UNION ALL 
+  
+  SELECT distinct display_skey, null as display, display as orig_display 
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM ( 
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R 
+    INNER JOIN display D 
+      ON R.s_key = D.skey AND D.language_id = 3 
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub2
+) sub 
+ON display.skey = sub.display_skey 
+    and display.hierarchy_display = sub.display 
+
+WHEN MATCHED THEN 
+  UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') 
+WHEN NOT MATCHED THEN 
+  INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, 
concat(sub.orig_display, '-mergenew1'))
+PREHOOK: type: QUERY
+PREHOOK: Input: default@calls
+PREHOOK: Input: default@display
+PREHOOK: Output: default@display
+PREHOOK: Output: default@display
+PREHOOK: Output: default@merge_tmp_table
+POSTHOOK: query: MERGE INTO display USING (
+  SELECT distinct display_skey, display, display as orig_display 
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM ( 
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R 
+    INNER JOIN display D 
+      ON R.s_key = D.skey AND D.language_id = 3 
+    GROUP BY D.skey, 
+      D.hierarchy_display
+  ) sub1 
+
+  UNION ALL 
+  
+  SELECT distinct display_skey, null as display, display as orig_display 
+  FROM (
+    SELECT D.skey display_skey, D.hierarchy_display display
+    FROM ( 
+      SELECT s_key FROM calls WHERE s_key =  1090969
+    ) R 
+    INNER JOIN display D 
+      ON R.s_key = D.skey AND D.language_id = 3 
+    GROUP BY D.skey,
+      D.hierarchy_display
+  ) sub2
+) sub 
+ON display.skey = sub.display_skey 
+    and display.hierarchy_display = sub.display 
+
+WHEN MATCHED THEN 
+  UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') 
+WHEN NOT MATCHED THEN 
+  INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, 
concat(sub.orig_display, '-mergenew1'))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@calls
+POSTHOOK: Input: default@display
+POSTHOOK: Output: default@display
+POSTHOOK: Output: default@display
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(display)display.null, ]
+PREHOOK: query: SELECT * FROM display
+PREHOOK: type: QUERY
+PREHOOK: Input: default@display
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM display
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@display
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1090969        NULL    NULL    3       
f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1-mergenew1    
NULL
+1090969        NULL    NULL    3       
f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1-mergeupdated1
        NULL
+1090969        NULL    NULL    3       
f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-mergenew1       NULL
+1090969        NULL    NULL    3       
f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-mergeupdated1   NULL
+PREHOOK: query: DROP TABLE calls
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@calls
+PREHOOK: Output: default@calls
+POSTHOOK: query: DROP TABLE calls
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@calls
+POSTHOOK: Output: default@calls
+PREHOOK: query: DROP TABLE display
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@display
+PREHOOK: Output: default@display
+POSTHOOK: query: DROP TABLE display
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@display
+POSTHOOK: Output: default@display
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
index 7dc64e71d25..63a2cf0b649 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.graph.OperatorGraph;
 import org.apache.hadoop.hive.ql.optimizer.graph.OperatorGraph.Cluster;
 import org.apache.hadoop.hive.ql.optimizer.graph.OperatorGraph.EdgeType;
@@ -215,7 +216,7 @@ public class SharedWorkOptimizer extends Transform {
 
     }
 
-    
if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) {
+    if 
(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) {
       // Try to reuse cache for broadcast side in mapjoin operators that
       // share same input.
       // First we group together all the mapjoin operators that share same
@@ -680,6 +681,11 @@ public class SharedWorkOptimizer extends Transform {
         retainableTsOp.getConf().getNeededColumns().add(col);
       }
     }
+    for (VirtualColumn col : discardableTsOp.getConf().getVirtualCols()) {
+      if (!retainableTsOp.getConf().getVirtualCols().contains(col)) {
+        retainableTsOp.getConf().getVirtualCols().add(col);
+      }
+    }
   }
 
   private static boolean compatibleSchema(TableScanOperator tsOp1, 
TableScanOperator tsOp2) {

Reply via email to