[ 
https://issues.apache.org/jira/browse/FLINK-24300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415595#comment-17415595
 ] 

Yun Gao edited comment on FLINK-24300 at 9/15/21, 4:24 PM:
-----------------------------------------------------------

>From the attached _jstack.txt_, the stack of a single task is like
{code:java}
"Source Data Fetcher for MultipleInput(readOrder=[0,1,1], 
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk = 
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk, 
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:- 
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n:  :- [#2] 
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales, 
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk, 
ws_ext_sales_price])\n:  +- [#3] TableSourceScan(table=[[hive, 
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk, 
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1] 
Exchange(distribution=[broadcast])\n]) [Source: 
HiveSource-tpcds_bin_orc_10000.web_sales, Source: 
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq, 
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name = 
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name = 
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name = 
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name = 
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name = 
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name = 
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f7]) -> 
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1) 
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2, 
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS 
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #217 prio=5 os_prio=0 
tid=0x00007fcda518f000 nid=0x1632d runnable [0x00007fcc9e0ed000]
   java.lang.Thread.State: RUNNABLE
        at 
java.util.concurrent.CompletableFuture.cleanStack(CompletableFuture.java:497)
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:567)
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:1068)
        at 
java.util.concurrent.CompletableFuture$OrRelay.tryFire(CompletableFuture.java:1549)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.moveToAvailable(FutureCompletingBlockingQueue.java:173)
        at 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.enqueue(FutureCompletingBlockingQueue.java:347)
        at 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.put(FutureCompletingBlockingQueue.java:211)
--
"Source Data Fetcher for MultipleInput(readOrder=[0,1,1], 
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk = 
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk, 
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:- 
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n:  :- [#2] 
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales, 
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk, 
ws_ext_sales_price])\n:  +- [#3] TableSourceScan(table=[[hive, 
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk, 
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1] 
Exchange(distribution=[broadcast])\n]) [Source: 
HiveSource-tpcds_bin_orc_10000.web_sales, Source: 
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq, 
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name = 
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name = 
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name = 
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name = 
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name = 
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name = 
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f7]) -> 
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1) 
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2, 
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS 
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #216 prio=5 os_prio=0 
tid=0x00007fcda51be000 nid=0x16327 waiting on condition [0x00007fcc9e1ee000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000005067ffcf0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at org.apache.flink.connector.file.src.util.Pool.pollEntry(Pool.java:82)
        at 
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.getCachedEntry(AbstractOrcFileInputFormat.java:292)
        at 
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.readBatch(AbstractOrcFileInputFormat.java:256)
        at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
--
"MultipleInput(readOrder=[0,1,1], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(d_date_sk = ws_sold_date_sk)], select=[ws_sold_date_sk, 
ws_ext_sales_price, d_date_sk, d_week_seq, d_day_name], isBroadcast=[true], 
build=[right])\n:- Union(all=[true], union=[ws_sold_date_sk, 
ws_ext_sales_price])\n:  :- [#2] TableSourceScan(table=[[hive, 
tpcds_bin_orc_10000, web_sales, project=[ws_sold_date_sk, 
ws_ext_sales_price]]], fields=[ws_sold_date_sk, ws_ext_sales_price])\n:  +- 
[#3] TableSourceScan(table=[[hive, tpcds_bin_orc_10000, catalog_sales, 
project=[cs_sold_date_sk, cs_ext_sales_price]]], fields=[cs_sold_date_sk, 
cs_ext_sales_price])\n+- [#1] Exchange(distribution=[broadcast])\n]) [Source: 
HiveSource-tpcds_bin_orc_10000.web_sales, Source: 
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq, 
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), ws_ext_sales_price, null:DOUBLE) AS $f1, CASE((d_day_name = 
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f2, CASE((d_day_name = 
_UTF-16LE'Tuesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f3, CASE((d_day_name = 
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f4, CASE((d_day_name = 
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f5, CASE((d_day_name = 
_UTF-16LE'Friday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f6, CASE((d_day_name = 
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
ws_ext_sales_price, null:DOUBLE) AS $f7]) -> 
LocalHashAggregate(groupBy=[d_week_seq], select=[d_week_seq, Partial_SUM($f1) 
AS sum$0, Partial_SUM($f2) AS sum$1, Partial_SUM($f3) AS sum$2, 
Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS sum$4, Partial_SUM($f6) AS 
sum$5, Partial_SUM($f7) AS sum$6]) (548/1050)#0" #200 prio=5 os_prio=0 
tid=0x00007fcda63ee800 nid=0x1624d waiting on condition [0x00007fcc9edfa000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000004fd5fce68> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.poll(FutureCompletingBlockingQueue.java:257)
{code}


was (Author: gaoyunhaii):
>From the attached _jstack.txt_, the stack of a single task is like 

{code:java}
"Source Data Fetcher for MultipleInput(readOrder=[0,1,1], 
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk = 
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk, 
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:- 
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n:  :- [#2] 
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales, 
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk, 
ws_ext_sales_price])\n:  +- [#3] TableSourceScan(table=[[hive, 
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk, 
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1] 
Exchange(distribution=[broadcast])\n]) [Source: 
HiveSource-tpcds_bin_orc_10000.web_sales, Source: 
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq, 
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), sales_price, null:DOUBLE) AS $f1, CASE((d_day_name = 
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f2, CASE((d_day_name = _UTF-16LE'Tuesday':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"), sales_price, null:DOUBLE) AS $f3, CASE((d_day_name = 
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
sales_price, null:DOUBLE) AS $f4, CASE((d_day_name = 
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f5, CASE((d_day_name = _UTF-16LE'Friday':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"), sales_price, null:DOUBLE) AS $f6, CASE((d_day_name = 
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f7]) -> LocalHashAggregate(groupBy=[d_week_seq], 
select=[d_week_seq, Partial_SUM($f1) AS sum$0, Partial_SUM($f2) AS sum$1, 
Partial_SUM($f3) AS sum$2, Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS 
sum$4, Partial_SUM($f6) AS sum$5, Partial_SUM($f7) AS sum$6]) (436/1050)#0" 
Id=246 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3e32560f
        at sun.misc.Unsafe.park(Native Method)
        -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3e32560f
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:189)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at org.apache.flink.connector.file.src.util.Pool.pollEntry(Pool.java:82)
        at 
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.getCachedEntry(AbstractOrcFileInputFormat.java:292)
        at 
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.readBatch(AbstractOrcFileInputFormat.java:256)
        at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
        ...
--
"Source Data Fetcher for MultipleInput(readOrder=[0,1,1], 
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk = 
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk, 
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:- 
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n:  :- [#2] 
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales, 
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk, 
ws_ext_sales_price])\n:  +- [#3] TableSourceScan(table=[[hive, 
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk, 
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1] 
Exchange(distribution=[broadcast])\n]) [Source: 
HiveSource-tpcds_bin_orc_10000.web_sales, Source: 
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq, 
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), sales_price, null:DOUBLE) AS $f1, CASE((d_day_name = 
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f2, CASE((d_day_name = _UTF-16LE'Tuesday':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"), sales_price, null:DOUBLE) AS $f3, CASE((d_day_name = 
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
sales_price, null:DOUBLE) AS $f4, CASE((d_day_name = 
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f5, CASE((d_day_name = _UTF-16LE'Friday':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"), sales_price, null:DOUBLE) AS $f6, CASE((d_day_name = 
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f7]) -> LocalHashAggregate(groupBy=[d_week_seq], 
select=[d_week_seq, Partial_SUM($f1) AS sum$0, Partial_SUM($f2) AS sum$1, 
Partial_SUM($f3) AS sum$2, Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS 
sum$4, Partial_SUM($f6) AS sum$5, Partial_SUM($f7) AS sum$6]) (436/1050)#0" 
Id=245 RUNNABLE
        at 
java.util.concurrent.CompletableFuture.cleanStack(CompletableFuture.java:483)
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:553)
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:1054)
        at 
java.util.concurrent.CompletableFuture$OrRelay.tryFire(CompletableFuture.java:1535)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.moveToAvailable(FutureCompletingBlockingQueue.java:169)
        at 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.enqueue(FutureCompletingBlockingQueue.java:341)
        ...
--
"MultipleInput(readOrder=[0,1,1], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(d_date_sk = ws_sold_date_sk)], select=[ws_sold_date_sk, 
ws_ext_sales_price, d_date_sk, d_week_seq, d_day_name], isBroadcast=[true], 
build=[right])\n:- Union(all=[true], union=[ws_sold_date_sk, 
ws_ext_sales_price])\n:  :- [#2] TableSourceScan(table=[[hive, 
tpcds_bin_orc_10000, web_sales, project=[ws_sold_date_sk, 
ws_ext_sales_price]]], fields=[ws_sold_date_sk, ws_ext_sales_price])\n:  +- 
[#3] TableSourceScan(table=[[hive, tpcds_bin_orc_10000, catalog_sales, 
project=[cs_sold_date_sk, cs_ext_sales_price]]], fields=[cs_sold_date_sk, 
cs_ext_sales_price])\n+- [#1] Exchange(distribution=[broadcast])\n]) [Source: 
HiveSource-tpcds_bin_orc_10000.web_sales, Source: 
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq, 
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), sales_price, null:DOUBLE) AS $f1, CASE((d_day_name = 
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f2, CASE((d_day_name = _UTF-16LE'Tuesday':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"), sales_price, null:DOUBLE) AS $f3, CASE((d_day_name = 
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
sales_price, null:DOUBLE) AS $f4, CASE((d_day_name = 
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f5, CASE((d_day_name = _UTF-16LE'Friday':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"), sales_price, null:DOUBLE) AS $f6, CASE((d_day_name = 
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f7]) -> LocalHashAggregate(groupBy=[d_week_seq], 
select=[d_week_seq, Partial_SUM($f1) AS sum$0, Partial_SUM($f2) AS sum$1, 
Partial_SUM($f3) AS sum$2, Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS 
sum$4, Partial_SUM($f6) AS sum$5, Partial_SUM($f7) AS sum$6]) (436/1050)#0" 
Id=233 WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@55cb8e3e 
owned by "Source Data Fetcher for MultipleInput(readOrder=[0,1,1], 
members=[\nHashJoin(joinType=[InnerJoin], where=[(d_date_sk = 
ws_sold_date_sk)], select=[ws_sold_date_sk, ws_ext_sales_price, d_date_sk, 
d_week_seq, d_day_name], isBroadcast=[true], build=[right])\n:- 
Union(all=[true], union=[ws_sold_date_sk, ws_ext_sales_price])\n:  :- [#2] 
TableSourceScan(table=[[hive, tpcds_bin_orc_10000, web_sales, 
project=[ws_sold_date_sk, ws_ext_sales_price]]], fields=[ws_sold_date_sk, 
ws_ext_sales_price])\n:  +- [#3] TableSourceScan(table=[[hive, 
tpcds_bin_orc_10000, catalog_sales, project=[cs_sold_date_sk, 
cs_ext_sales_price]]], fields=[cs_sold_date_sk, cs_ext_sales_price])\n+- [#1] 
Exchange(distribution=[broadcast])\n]) [Source: 
HiveSource-tpcds_bin_orc_10000.web_sales, Source: 
HiveSource-tpcds_bin_orc_10000.catalog_sales] -> Calc(select=[d_week_seq, 
CASE((d_day_name = _UTF-16LE'Sunday':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"), sales_price, null:DOUBLE) AS $f1, CASE((d_day_name = 
_UTF-16LE'Monday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f2, CASE((d_day_name = _UTF-16LE'Tuesday':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"), sales_price, null:DOUBLE) AS $f3, CASE((d_day_name = 
_UTF-16LE'Wednesday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
sales_price, null:DOUBLE) AS $f4, CASE((d_day_name = 
_UTF-16LE'Thursday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f5, CASE((d_day_name = _UTF-16LE'Friday':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"), sales_price, null:DOUBLE) AS $f6, CASE((d_day_name = 
_UTF-16LE'Saturday':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), sales_price, 
null:DOUBLE) AS $f7]) -> LocalHashAggregate(groupBy=[d_week_seq], 
select=[d_week_seq, Partial_SUM($f1) AS sum$0, Partial_SUM($f2) AS sum$1, 
Partial_SUM($f3) AS sum$2, Partial_SUM($f4) AS sum$3, Partial_SUM($f5) AS 
sum$4, Partial_SUM($f6) AS sum$5, Partial_SUM($f7) AS sum$6]) (436/1050)#0" 
Id=245
        at sun.misc.Unsafe.park(Native Method)
        -  waiting on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@55cb8e3e
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:189)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.poll(FutureCompletingBlockingQueue.java:251)
        ...
{code}


> MultipleInputOperator is running much more slowly in TPCDS
> ----------------------------------------------------------
>
>                 Key: FLINK-24300
>                 URL: https://issues.apache.org/jira/browse/FLINK-24300
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.14.0, 1.15.0
>            Reporter: Zhilong Hong
>            Priority: Major
>         Attachments: 64570e4c56955713ca599fd1d7ae7be891a314c6.png, 
> detail-of-the-job.png, e3010c16947ed8da2ecb7d89a3aa08dacecc524a.png, 
> jstack.txt, jstack.txt
>
>
> When we are running TPCDS with release 1.14 we find that the job with 
> MultipleInputOperator is running much more slowly than before. With a binary 
> search among the commits, we find that the issue may be introduced by 
> FLINK-23408. 
> At the commit 64570e4c56955713ca599fd1d7ae7be891a314c6, the job runs normally 
> in TPCDS, as the image below illustrates:
> !64570e4c56955713ca599fd1d7ae7be891a314c6.png|width=600!
> At the commit e3010c16947ed8da2ecb7d89a3aa08dacecc524a, the job q2.sql gets 
> stuck for a pretty long time (longer than half an hour), as the image below 
> illustrates:
> !e3010c16947ed8da2ecb7d89a3aa08dacecc524a.png|width=600!
> The detail of the job is illustrated below:
> !detail-of-the-job.png|width=600!
> The job uses a {{MultipleInputOperator}} with one normal input and two 
> chained FileSource. It has finished reading the normal input and start to 
> read the chained source. Each chained source has one source data fetcher.
> We capture the jstack of the stuck tasks and attach the file below. From the 
> [^jstack.txt] we can see the main thread is blocked on waiting for the lock, 
> and the lock is held by a source data fetcher. The source data fetcher is 
> still running but the stack keeps on {{CompletableFuture.cleanStack}}.
> This issue happens in a batch job. However, from where it get blocked, it 
> seems also affects the streaming jobs.
> For the reference, the code of TPCDS we are running is located at 
> [https://github.com/ververica/flink-sql-benchmark/tree/dev].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to