This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9966c3d298 HIVE-26128: Enabling dynamic runtime filtering in Iceberg
tables throws exception at runtime (#3404) (Adam Szita, reviewed by Peter Vary)
9966c3d298 is described below
commit 9966c3d2986aeb20fb3698f894c1ca32b390bfa8
Author: Adam Szita <[email protected]>
AuthorDate: Tue Jun 28 09:45:50 2022 +0200
HIVE-26128: Enabling dynamic runtime filtering in Iceberg tables throws
exception at runtime (#3404) (Adam Szita, reviewed by Peter Vary)
---
data/conf/iceberg/tez/tez-site.xml | 8 +-
.../iceberg/mr/hive/HiveIcebergFilterFactory.java | 12 +-
.../queries/positive/dynamic_semijoin_reduction.q | 38 ++++
.../positive/dynamic_semijoin_reduction.q.out | 214 +++++++++++++++++++++
4 files changed, 266 insertions(+), 6 deletions(-)
diff --git a/data/conf/iceberg/tez/tez-site.xml
b/data/conf/iceberg/tez/tez-site.xml
index 7ad5ad4c66..467bfb4dad 100644
--- a/data/conf/iceberg/tez/tez-site.xml
+++ b/data/conf/iceberg/tez/tez-site.xml
@@ -1,8 +1,4 @@
<configuration>
- <property>
- <name>tez.am.resource.memory.mb</name>
- <value>128</value>
- </property>
<property>
<name>tez.am.dag.scheduler.class</name>
<value>org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled</value>
@@ -11,4 +7,8 @@
<name>tez.am.resource.memory.mb</name>
<value>256</value>
</property>
+ <property>
+ <name>hive.tez.container.size</name>
+ <value>512</value>
+ </property>
</configuration>
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
index 1559325c9a..6101ad159a 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
@@ -107,8 +107,16 @@ public class HiveIcebergFilterFactory {
return in(column, leafToLiteralList(leaf));
case BETWEEN:
List<Object> icebergLiterals = leafToLiteralList(leaf);
- return and(greaterThanOrEqual(column, icebergLiterals.get(0)),
- lessThanOrEqual(column, icebergLiterals.get(1)));
+ if (icebergLiterals.size() == 2) {
+ return and(greaterThanOrEqual(column, icebergLiterals.get(0)),
+ lessThanOrEqual(column, icebergLiterals.get(1)));
+ } else {
+ // In case semijoin reduction optimization was applied, there will
be a BETWEEN( DynamicValue, DynamicValue)
+ // clause, where DynamicValue is not evaluable in Tez AM, where Hive
filter is translated into Iceberg filter.
+ // Overwriting to constant true as the optimization will be utilized
by Hive/Tez and no-op for Iceberg.
+ // (Also: the original filter and Iceberg filter are both part of
JobConf on the execution side.)
+ return Expressions.alwaysTrue();
+ }
case IS_NULL:
return isNull(column);
default:
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q
b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q
new file mode 100644
index 0000000000..035b5525f8
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q
@@ -0,0 +1,38 @@
+--! qt:dataset:srcpart
+--! qt:dataset:alltypesorc
+set hive.compute.query.using.stats=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.tez.dynamic.semijoin.reduction=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.stats.autogather=true;
+set hive.tez.bigtable.minsize.semijoin.reduction=1;
+set hive.tez.min.bloom.filter.entries=1;
+set hive.stats.fetch.column.stats=true;
+set hive.tez.bloom.filter.factor=1.0f;
+
+-- Create Tables
+create table srcpart_date_n7 (key string, value string) partitioned by (ds
string ) stored by iceberg;
+CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING) partitioned by (ds
string) stored by iceberg;
+
+-- Add Partitions
+--alter table srcpart_date_n7 add partition (ds = "2008-04-08");
+--alter table srcpart_date_n7 add partition (ds = "2008-04-09");
+
+--alter table srcpart_small_n3 add partition (ds = "2008-04-08");
+--alter table srcpart_small_n3 add partition (ds = "2008-04-09");
+
+-- Load
+insert overwrite table srcpart_date_n7 select key, value, ds from srcpart
where ds = "2008-04-08";
+insert overwrite table srcpart_date_n7 select key, value, ds from srcpart
where ds = "2008-04-09";
+insert overwrite table srcpart_small_n3 select key, value, ds from srcpart
where ds = "2008-04-09" limit 20;
+
+EXPLAIN select count(*) from srcpart_date_n7 join srcpart_small_n3 on
(srcpart_date_n7.key = srcpart_small_n3.key1);
+select count(*) from srcpart_date_n7 join srcpart_small_n3 on
(srcpart_date_n7.key = srcpart_small_n3.key1);
+
+drop table srcpart_date_n7;
+drop table srcpart_small_n3;
\ No newline at end of file
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out
b/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out
new file mode 100644
index 0000000000..ef5effcb09
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out
@@ -0,0 +1,214 @@
+PREHOOK: query: create table srcpart_date_n7 (key string, value string)
partitioned by (ds string ) stored by iceberg
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_date_n7
+POSTHOOK: query: create table srcpart_date_n7 (key string, value string)
partitioned by (ds string ) stored by iceberg
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_date_n7
+PREHOOK: query: CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING)
partitioned by (ds string) stored by iceberg
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_small_n3
+POSTHOOK: query: CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING)
partitioned by (ds string) stored by iceberg
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_small_n3
+PREHOOK: query: insert overwrite table srcpart_date_n7 select key, value, ds
from srcpart where ds = "2008-04-08"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_date_n7
+POSTHOOK: query: insert overwrite table srcpart_date_n7 select key, value,
ds from srcpart where ds = "2008-04-08"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_date_n7
+PREHOOK: query: insert overwrite table srcpart_date_n7 select key, value, ds
from srcpart where ds = "2008-04-09"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_date_n7
+POSTHOOK: query: insert overwrite table srcpart_date_n7 select key, value, ds
from srcpart where ds = "2008-04-09"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_date_n7
+PREHOOK: query: insert overwrite table srcpart_small_n3 select key, value, ds
from srcpart where ds = "2008-04-09" limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_small_n3
+POSTHOOK: query: insert overwrite table srcpart_small_n3 select key, value,
ds from srcpart where ds = "2008-04-09" limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_small_n3
+PREHOOK: query: EXPLAIN select count(*) from srcpart_date_n7 join
srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_date_n7
+PREHOOK: Input: default@srcpart_small_n3
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: EXPLAIN select count(*) from srcpart_date_n7 join
srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_date_n7
+POSTHOOK: Input: default@srcpart_small_n3
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Reducer 5 (BROADCAST_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_date_n7
+ filterExpr: (key is not null and key BETWEEN
DynamicValue(RS_7_srcpart_small_n3_key1_min) AND
DynamicValue(RS_7_srcpart_small_n3_key1_max) and in_bloom_filter(key,
DynamicValue(RS_7_srcpart_small_n3_key1_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 2000 Data size: 174000 Basic stats:
COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: (key is not null and key BETWEEN
DynamicValue(RS_7_srcpart_small_n3_key1_min) AND
DynamicValue(RS_7_srcpart_small_n3_key1_max) and in_bloom_filter(key,
DynamicValue(RS_7_srcpart_small_n3_key1_bloom_filter))) (type: boolean)
+ Statistics: Num rows: 2000 Data size: 174000 Basic stats:
COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 2000 Data size: 174000 Basic
stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 2000 Data size: 174000 Basic
stats: COMPLETE Column stats: COMPLETE
+ Execution mode: vectorized
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: srcpart_small_n3
+ filterExpr: key1 is not null (type: boolean)
+ Statistics: Num rows: 20 Data size: 1740 Basic stats:
COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key1 is not null (type: boolean)
+ Statistics: Num rows: 20 Data size: 1740 Basic stats:
COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key1 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 20 Data size: 1740 Basic stats:
COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 20 Data size: 1740 Basic stats:
COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 20 Data size: 1740 Basic stats:
COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: min(_col0), max(_col0),
bloom_filter(_col0, expectedEntries=20)
+ minReductionHashAggr: 0.95
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 552 Basic stats:
COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ null sort order:
+ sort order:
+ Statistics: Num rows: 1 Data size: 552 Basic
stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col0 (type: string), _col1
(type: string), _col2 (type: binary)
+ Execution mode: vectorized
+ Reducer 2
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Statistics: Num rows: 126 Data size: 1008 Basic stats:
COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: count()
+ minReductionHashAggr: 0.99
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: COMPLETE
+ Reduce Output Operator
+ null sort order:
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: COMPLETE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: COMPLETE
+ table:
+ input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1),
bloom_filter(VALUE._col2, expectedEntries=20)
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE
Column stats: COMPLETE
+ Reduce Output Operator
+ null sort order:
+ sort order:
+ Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE
Column stats: COMPLETE
+ value expressions: _col0 (type: string), _col1 (type:
string), _col2 (type: binary)
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from srcpart_date_n7 join srcpart_small_n3 on
(srcpart_date_n7.key = srcpart_small_n3.key1)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_date_n7
+PREHOOK: Input: default@srcpart_small_n3
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(*) from srcpart_date_n7 join srcpart_small_n3 on
(srcpart_date_n7.key = srcpart_small_n3.key1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_date_n7
+POSTHOOK: Input: default@srcpart_small_n3
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+176
+PREHOOK: query: drop table srcpart_date_n7
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_date_n7
+PREHOOK: Output: default@srcpart_date_n7
+POSTHOOK: query: drop table srcpart_date_n7
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_date_n7
+POSTHOOK: Output: default@srcpart_date_n7
+PREHOOK: query: drop table srcpart_small_n3
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_small_n3
+PREHOOK: Output: default@srcpart_small_n3
+POSTHOOK: query: drop table srcpart_small_n3
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_small_n3
+POSTHOOK: Output: default@srcpart_small_n3