[ 
https://issues.apache.org/jira/browse/IMPALA-9338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231800#comment-17231800
 ] 

Yida Wu edited comment on IMPALA-9338 at 11/13/20, 8:19 PM:
------------------------------------------------------------

Since I am not able to repro the issue with the client's table definition, I am 
trying to reproduce the issue by using the existing tables of tpcds_parquet 
database base on the criterion:
 # Use partitioned hash join (use shuffle hint in the query)
 # The right table is much bigger than the left table so that the join order 
gets reverted.
 #  Have duplicate predicates in the ON clause

*DDL*

CREATE EXTERNAL TABLE default.cus as select * from tpcds_parquet.customer;
 CREATE EXTERNAL TABLE default.sales as select * from tpcds_parquet.store_sales;

*Query*

select * from cus b LEFT JOIN sales a ON b.c_customer_sk = a.ss_customer_sk AND 
a.ss_customer_sk = b.c_customer_sk and b.c_current_addr_sk = a.ss_addr_sk and 
b.c_first_sales_date_sk = a.ss_sold_date_sk where a.ss_sold_time_sk > 1 and 
b.c_current_addr_sk in (1,10000) and b.c_email_address like '%a%' limit 10;

*Plan*

 
{code:java}
F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B 
thread-reservation=1
PLAN-ROOT SINK
| output exprs: b.c_customer_sk, b.c_customer_id, b.c_current_cdemo_sk, 
b.c_current_hdemo_sk, b.c_current_addr_sk, b.c_first_shipto_date_sk, 
b.c_first_sales_date_sk, b.c_salutation, b.c_first_name, b.c_last_name, 
b.c_preferred_cust_flag, b.c_birth_day, b.c_birth_month, b.c_birth_year, 
b.c_birth_country, b.c_login, b.c_email_address, b.c_last_review_date, 
a.ss_sold_time_sk, a.ss_item_sk, a.ss_customer_sk, a.ss_cdemo_sk, 
a.ss_hdemo_sk, a.ss_addr_sk, a.ss_store_sk, a.ss_promo_sk, a.ss_ticket_number, 
a.ss_quantity, a.ss_wholesale_cost, a.ss_list_price, a.ss_sales_price, 
a.ss_ext_discount_amt, a.ss_ext_sales_price, a.ss_ext_wholesale_cost, 
a.ss_ext_list_price, a.ss_ext_tax, a.ss_coupon_amt, a.ss_net_paid, 
a.ss_net_paid_inc_tax, a.ss_net_profit, a.ss_sold_date_sk
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
05:EXCHANGE [UNPARTITIONED]
| limit: 10
| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
| tuple-ids=1N,0 row-size=244B cardinality=10
| in pipelines: 01(GETNEXT)
|
F02:PLAN FRAGMENT 
[HASH(b.c_current_addr_sk,b.c_customer_sk,b.c_customer_sk,b.c_first_sales_date_sk)]
 hosts=3 instances=3
Per-Host Resources: mem-estimate=17.67MB mem-reservation=5.94MB 
thread-reservation=1 runtime-filters-memory=4.00MB
02:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| hash predicates: a.ss_addr_sk = b.c_current_addr_sk, a.ss_customer_sk = 
b.c_customer_sk, a.ss_customer_sk = b.c_customer_sk, a.ss_sold_date_sk = 
b.c_first_sales_date_sk
| fk/pk conjuncts: assumed fk/pk
| other predicates: a.ss_sold_time_sk > CAST(1 AS INT)
| runtime filters: RF000[bloom] <- b.c_current_addr_sk, RF001[bloom] <- 
b.c_customer_sk, RF002[bloom] <- b.c_customer_sk, RF003[bloom] <- 
b.c_first_sales_date_sk
| limit: 10
| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
| tuple-ids=1N,0 row-size=244B cardinality=10
| in pipelines: 01(GETNEXT), 00(OPEN)
|
|--04:EXCHANGE 
[HASH(b.c_current_addr_sk,b.c_customer_sk,b.c_customer_sk,b.c_first_sales_date_sk)]
| | mem-estimate=1.42MB mem-reservation=0B thread-reservation=0
| | tuple-ids=0 row-size=144B cardinality=9.31K
| | in pipelines: 00(GETNEXT)
| |
| F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
| Per-Host Resources: mem-estimate=48.00MB mem-reservation=8.00MB 
thread-reservation=2
| 00:SCAN HDFS [default.cus b, RANDOM]
| HDFS partitions=1/1 files=1 size=12.79MB
| predicates: b.c_current_addr_sk IN (CAST(1 AS INT), CAST(10000 AS INT)), 
b.c_email_address LIKE '%a%'
| stored statistics:
| table: rows=unavailable size=unavailable
| columns: unavailable
| extrapolated-rows=disabled max-scan-range-rows=unavailable
| mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
| tuple-ids=0 row-size=144B cardinality=9.31K
| in pipelines: 00(GETNEXT)
|
03:EXCHANGE 
[HASH(a.ss_addr_sk,a.ss_customer_sk,a.ss_customer_sk,a.ss_sold_date_sk)]
| mem-estimate=10.30MB mem-reservation=0B thread-reservation=0
| tuple-ids=1 row-size=100B cardinality=391.02K
| in pipelines: 01(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
Per-Host Resources: mem-estimate=180.00MB mem-reservation=12.00MB 
thread-reservation=2 runtime-filters-memory=4.00MB
01:SCAN HDFS [default.sales a, RANDOM]
 HDFS partitions=1/1 files=3 size=372.91MB
 predicates: a.ss_sold_time_sk > CAST(1 AS INT), a.ss_addr_sk IN (CAST(1 AS 
INT), CAST(10000 AS INT))
 runtime filters: RF000[bloom] -> a.ss_addr_sk, RF001[bloom] -> 
a.ss_customer_sk, RF002[bloom] -> a.ss_customer_sk, RF003[bloom] -> 
a.ss_sold_date_sk
 stored statistics:
 table: rows=unavailable size=unavailable
 columns: unavailable
 extrapolated-rows=disabled max-scan-range-rows=unavailable
 mem-estimate=176.00MB mem-reservation=8.00MB thread-reservation=1
 tuple-ids=1 row-size=100B cardinality=391.02K
 in pipelines: 01(GETNEXT)
----------------{code}
However the plan still looks good. Compared to the original flawed plan, a 
major difference is in the version Impala 4.0, we are using bloomfilter in the 
plan.


was (Author: baggio000):
Since I am not able to repro the issue with the client's table definition, I am 
trying to reproduce the issue by using the existing tables of tpcds_parquet 
database base on the criterion:
 # Use partitioned hash join (use shuffle hint in the query)
 # The right table is much bigger than the left table so that the join order 
gets reverted.
 #  Have duplicate predicates in the ON clause

*DDL*

CREATE EXTERNAL TABLE default.cus as select * from tpcds_parquet.customer;
CREATE EXTERNAL TABLE default.sales as select * from tpcds_parquet.store_sales;

*Query*

select * from cus b LEFT JOIN sales a ON b.c_customer_sk = a.ss_customer_sk AND 
a.ss_customer_sk = b.c_customer_sk and b.c_current_addr_sk = a.ss_addr_sk and 
b.c_first_sales_date_sk = a.ss_sold_date_sk where a.ss_sold_time_sk > 1 and 
b.c_current_addr_sk in (1,10000) and b.c_email_address like '%a%' limit 10;

*Plan*

F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B 
thread-reservation=1
PLAN-ROOT SINK
| output exprs: b.c_customer_sk, b.c_customer_id, b.c_current_cdemo_sk, 
b.c_current_hdemo_sk, b.c_current_addr_sk, b.c_first_shipto_date_sk, 
b.c_first_sales_date_sk, b.c_salutation, b.c_first_name, b.c_last_name, 
b.c_preferred_cust_flag, b.c_birth_day, b.c_birth_month, b.c_birth_year, 
b.c_birth_country, b.c_login, b.c_email_address, b.c_last_review_date, 
a.ss_sold_time_sk, a.ss_item_sk, a.ss_customer_sk, a.ss_cdemo_sk, 
a.ss_hdemo_sk, a.ss_addr_sk, a.ss_store_sk, a.ss_promo_sk, a.ss_ticket_number, 
a.ss_quantity, a.ss_wholesale_cost, a.ss_list_price, a.ss_sales_price, 
a.ss_ext_discount_amt, a.ss_ext_sales_price, a.ss_ext_wholesale_cost, 
a.ss_ext_list_price, a.ss_ext_tax, a.ss_coupon_amt, a.ss_net_paid, 
a.ss_net_paid_inc_tax, a.ss_net_profit, a.ss_sold_date_sk
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
05:EXCHANGE [UNPARTITIONED]
| limit: 10
| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
| tuple-ids=1N,0 row-size=244B cardinality=10
| in pipelines: 01(GETNEXT)
|
F02:PLAN FRAGMENT 
[*HASH(b.c_current_addr_sk,b.c_customer_sk,b.c_customer_sk,b.c_first_sales_date_sk)*]
 hosts=3 instances=3
Per-Host Resources: mem-estimate=17.67MB mem-reservation=5.94MB 
thread-reservation=1 runtime-filters-memory=4.00MB
02:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| hash predicates: a.ss_addr_sk = b.c_current_addr_sk, a.ss_customer_sk = 
b.c_customer_sk, a.ss_customer_sk = b.c_customer_sk, a.ss_sold_date_sk = 
b.c_first_sales_date_sk
| fk/pk conjuncts: assumed fk/pk
| other predicates: a.ss_sold_time_sk > CAST(1 AS INT)
| runtime filters: RF000[bloom] <- b.c_current_addr_sk, RF001[bloom] <- 
b.c_customer_sk, RF002[bloom] <- b.c_customer_sk, RF003[bloom] <- 
b.c_first_sales_date_sk
| limit: 10
| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
| tuple-ids=1N,0 row-size=244B cardinality=10
| in pipelines: 01(GETNEXT), 00(OPEN)
|
|--04:EXCHANGE 
[HASH(b.c_current_addr_sk,b.c_customer_sk,b.c_customer_sk,b.c_first_sales_date_sk)]
| | mem-estimate=1.42MB mem-reservation=0B thread-reservation=0
| | tuple-ids=0 row-size=144B cardinality=9.31K
| | in pipelines: 00(GETNEXT)
| |
| F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
| Per-Host Resources: mem-estimate=48.00MB mem-reservation=8.00MB 
thread-reservation=2
| 00:SCAN HDFS [default.cus b, RANDOM]
| HDFS partitions=1/1 files=1 size=12.79MB
| predicates: b.c_current_addr_sk IN (CAST(1 AS INT), CAST(10000 AS INT)), 
b.c_email_address LIKE '%a%'
| stored statistics:
| table: rows=unavailable size=unavailable
| columns: unavailable
| extrapolated-rows=disabled max-scan-range-rows=unavailable
| mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
| tuple-ids=0 row-size=144B cardinality=9.31K
| in pipelines: 00(GETNEXT)
|
03:EXCHANGE 
[*HASH(a.ss_addr_sk,a.ss_customer_sk,a.ss_customer_sk,a.ss_sold_date_sk)*]
| mem-estimate=10.30MB mem-reservation=0B thread-reservation=0
| tuple-ids=1 row-size=100B cardinality=391.02K
| in pipelines: 01(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
Per-Host Resources: mem-estimate=180.00MB mem-reservation=12.00MB 
thread-reservation=2 runtime-filters-memory=4.00MB
01:SCAN HDFS [default.sales a, RANDOM]
 HDFS partitions=1/1 files=3 size=372.91MB
 predicates: a.ss_sold_time_sk > CAST(1 AS INT), a.ss_addr_sk IN (CAST(1 AS 
INT), CAST(10000 AS INT))
 runtime filters: RF000[bloom] -> a.ss_addr_sk, RF001[bloom] -> 
a.ss_customer_sk, RF002[bloom] -> a.ss_customer_sk, RF003[bloom] -> 
a.ss_sold_date_sk
 stored statistics:
 table: rows=unavailable size=unavailable
 columns: unavailable
 extrapolated-rows=disabled max-scan-range-rows=unavailable
 mem-estimate=176.00MB mem-reservation=8.00MB thread-reservation=1
 tuple-ids=1 row-size=100B cardinality=391.02K
 in pipelines: 01(GETNEXT)
----------------

However the plan still looks good. Compared to the original flawed plan, a 
major difference is in the version Impala 4.0, we are using bloomfilter in the 
plan.

> Impala crashing in impala::RowDescriptor::TupleIsNullable(int)
> --------------------------------------------------------------
>
>                 Key: IMPALA-9338
>                 URL: https://issues.apache.org/jira/browse/IMPALA-9338
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Backend
>    Affects Versions: Impala 3.3.0
>            Reporter: Abhishek Rawat
>            Assignee: Yida Wu
>            Priority: Blocker
>              Labels: crash
>
> Repro:
> {code:java}
> create database default;
> CREATE EXTERNAL TABLE default.dimension ( ssn_id INT, act_num CHAR(1), eff_dt 
> CHAR(10), seq_num SMALLINT, entry_dt CHAR(10), map ARRAY<INT>, src CHAR(10), 
> msg CHAR(1), msg_num CHAR(3), remarks CHAR(3), description CHAR(26), 
> default_load_ts CHAR(26), map_cd VARCHAR(50) ) PARTITIONED BY ( year INT, 
> ssn_hash INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u001C' WITH 
> SERDEPROPERTIES ('colelction.delim'=',', 'field.delim'='\u001C', 
> 'serialization.format'='\u001C') STORED AS PARQUET --LOCATION 
> 'hdfs://prdnameservice/user/hive/warehouse/default.db/dimension' 
> TBLPROPERTIES ('DO_NOT_UPDATE_STATS'='true', 'STATS_GENERATED'='TASK', 
> 'STATS_GENERATED_VIA_STATS_TASK'='true', 
> 'impala.lastComputeStatsTime'='1579246708', 'last_modified_by'='a00811p', 
> 'last_modified_time'='1489791214', 'numRows'='7357715311', 
> 'totalSize'='235136295799');
> CREATE EXTERNAL TABLE default.fact ( ssn_id_n INT, bor_act_sfx CHAR(1), 
> start_dt CHAR(10), seq_num SMALLINT, msg_n CHAR(8), end_dt CHAR(10), reviews 
> CHAR(50), description CHAR(50), detail CHAR(50), default_load_ts CHAR(26) ) 
> PARTITIONED BY ( year INT, ssn_hash INT ) ROW FORMAT DELIMITED FIELDS 
> TERMINATED BY '\u0016' WITH SERDEPROPERTIES ('field.delim'='\u0016', 
> 'serialization.format'='\u0016') STORED AS PARQUET --LOCATION 
> 'hdfs://prdnameservice/user/hive/warehouse/default.db/fact' TBLPROPERTIES 
> ('DO_NOT_UPDATE_STATS'='true', 'STATS_GENERATED'='TASK', 
> 'STATS_GENERATED_VIA_STATS_TASK'='true', 
> 'impala.lastComputeStatsTime'='1579242111', 'last_modified_by'='e32940', 
> 'last_modified_time'='1484186332', 'numRows'='5142832439', 
> 'totalSize'='105397898347'); 
> use default;
> select ssn_id_n, bor_act_sfx, amap.item, start_dt, reviews, concat(msg, 
> msg_num) corr_code from dimension, dimension.map amap LEFT JOIN fact ON 
> dimension.ssn_id = fact.ssn_id_n AND dimension.act_num = fact.bor_act_sfx AND 
> dimension.eff_dt = fact.start_dt and dimension.year = fact.year --and 
> dimension.month(cast(eff_dt as timestamp)) = fact.month(cast(start_dt as 
> timestamp)) AND dimension.YEAR = fact.YEAR AND fact.year in (2018,2019) where 
> dimension.msg like '%B295%' AND dimension.year in (2018,2019);{code}
> Stack Trace:
> {code:java}
> #0 0x0000000000f8b1b9 in impala::RowDescriptor::TupleIsNullable(int) const () 
> #1 0x000000000130911f in impala::SlotRef::Init(impala::RowDescriptor const&, 
> impala::RuntimeState*) () 
> #2 0x000000000130748e in impala::ScalarExpr::Create(impala::TExpr const&, 
> impala::RowDescriptor const&, impala::RuntimeState*, impala::ObjectPool*, 
> impala::ScalarExpr**) () 
> #3 0x00000000013075e5 in 
> impala::ScalarExpr::Create(std::vector<impala::TExpr, 
> std::allocator<impala::TExpr> > const&, impala::RowDescriptor const&, 
> impala::RuntimeState*, impala::ObjectPool*, std::vector<impala::ScalarExpr*, 
> std::allocator<impala::ScalarExpr*> >*) () 
> #4 0x000000000130769f in 
> impala::ScalarExpr::Create(std::vector<impala::TExpr, 
> std::allocator<impala::TExpr> > const&, impala::RowDescriptor const&, 
> impala::RuntimeState*, std::vector<impala::ScalarExpr*, 
> std::allocator<impala::ScalarExpr*> >*) () 
> #5 0x000000000149c1aa in 
> impala::KrpcDataStreamSender::Init(std::vector<impala::TExpr, 
> std::allocator<impala::TExpr> > const&, impala::TDataSink const&, 
> impala::RuntimeState*) () 
> #6 0x0000000001208ad3 in impala::DataSink::Create(impala::TPlanFragmentCtx 
> const&, impala::TPlanFragmentInstanceCtx const&, impala::RowDescriptor 
> const*, impala::RuntimeState*, impala::DataSink**) () 
> #7 0x0000000000fac9a4 in impala::FragmentInstanceState::Prepare() () 
> #8 0x0000000000fad3dd in impala::FragmentInstanceState::Exec() () 
> #9 0x0000000000f98e77 in 
> impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) () 
> #10 0x00000000011a1490 in impala::Thread::SuperviseThread(std::string const&, 
> std::string const&, boost::function<void ()>, impala::ThreadDebugInfo const*, 
> impala::Promise<long, (impala::PromiseMode)0>*) () 
> #11 0x00000000011a203a in boost::detail::thread_data<boost::_bi::bind_t<void, 
> void (std::string const&, std::string const&, boost::function<void ()>, 
> impala::ThreadDebugInfo const*, impala::Promise<long, 
> (impala::PromiseMode)0>), boost::_bi::list5<boost::_bi::value<std::string>, 
> boost::_bi::value<std::string>, boost::_bi::value<boost::function<void ()> >, 
> boost::_bi::value<impala::ThreadDebugInfo>, 
> boost::_bi::value<impala::Promise<long, (impala::PromiseMode)0>*> > > 
> >::run() () 
> #12 0x00000000017909ca in thread_proxy () #13 0x00007f8832fa6aa1 in 
> __pthread_initialize_minimal_internal () from /lib64/libpthread.so.0 #14 
> 0x0000000000000000 in ?? ()
> {code}
>  
> The crash only happens when ROJ plan is selected. If, LOJ plan is selected 
> the query runs successfully.
> Initial investigation indicates that the Scalar expression being contructed 
> in the above stack trace is referencing an invalid tupleId in the row 
> descriptor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to