[
https://issues.apache.org/jira/browse/HIVE-27078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
László Bodor updated HIVE-27078:
--------------------------------
Description:
Considering this DAG:
{code}
| Map 1 <- Reducer 3 (CUSTOM_EDGE) |
| Map 2 <- Map 4 (CUSTOM_EDGE) |
| Map 5 <- Map 1 (CUSTOM_EDGE) |
| Reducer 3 <- Map 2 (SIMPLE_EDGE)
{code}
this can be simplified further, just picked from a customer query, the
problematic vertices and edge is:
{code}
| Map 1 <- Reducer 3 (CUSTOM_EDGE) |
{code}
Reducer 3 started scheduled with 20 tasks, and later it's decided by auto
reducer parallelism that only 4 tasks are needed:
{code}
2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4]
|vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex:
Reducer 3 from 20 to 4
{code}
in this case, Map 1 can hang as it still expects 20 inputs:
{code}
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
FAILED KILLED
----------------------------------------------------------------------------------------------
Map 4 .......... container SUCCEEDED 16 16 0 0
0 0
Map 2 .......... container SUCCEEDED 48 48 0 0
0 0
Reducer 3 ...... container SUCCEEDED 4 4 0 0
0 0
Map 1 container RUNNING 192 0 13 179
0 0
Map 5 container INITED 241 0 0 241
0 0
----------------------------------------------------------------------------------------------
VERTICES: 03/05 [===>>-----------------------] 13% ELAPSED TIME: 901.18 s
----------------------------------------------------------------------------------------------
{code}
in logs it's like:
{code}
2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}]
|impl.ShuffleManager|: Reducer_3: numInputs=20,
compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10,
ifileBufferSize=4096, ifileReadAheadEnabled=true, ifileReadAheadLength=4194304,
localDiskFetchEnabled=true, sharedFetchEnabled=false, keepAlive=true,
keepAliveMaxConnections=20, connectionTimeout=180000, readTimeout=180000,
bufferSize=8192, bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false
...
receives the input event:
2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|:
Routing events from heartbeat response to task,
currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1
fromEventId=0 nextFromEventId=0
...but then it hangs while waiting for further inputs:
"TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581
waiting on condition [0x00007f3f737ba000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000071ad90a00> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
at
java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033)
at
org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
at
org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
at
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129)
at
org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385)
at
org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454)
at
org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241)
at
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555)
at
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571)
at
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252)
at
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
at
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
at
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
at
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
at
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}
we can temporarily (as a quick workaround) disable auto reducer parallelism if
on a vertex if it's a source of a bucket map join
was:
Considering this DAG:
{code}
| Map 1 <- Reducer 3 (CUSTOM_EDGE) |
| Map 2 <- Map 4 (CUSTOM_EDGE) |
| Map 5 <- Map 1 (CUSTOM_EDGE) |
| Reducer 3 <- Map 2 (SIMPLE_EDGE)
{code}
this can be simplified further, just picked from a customer query, the
problematic vertices and edge is:
{code}
| Map 1 <- Reducer 3 (CUSTOM_EDGE) |
{code}
Reducer 3 started scheduled with 20 tasks, and later it's decided by auto
reducer parallelism that only 4 tasks are needed:
{code}
2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4]
|vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex:
Reducer 3 from 20 to 4
{code}
in this case, Map 1 can hang as it still expects 20 inputs:
{code}
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
FAILED KILLED
----------------------------------------------------------------------------------------------
Map 4 .......... container SUCCEEDED 16 16 0 0
0 0
Map 2 .......... container SUCCEEDED 48 48 0 0
0 0
Reducer 3 ...... container SUCCEEDED 4 4 0 0
0 0
Map 1 container RUNNING 192 0 13 179
0 0
Map 5 container INITED 241 0 0 241
0 0
----------------------------------------------------------------------------------------------
VERTICES: 03/05 [===>>-----------------------] 13% ELAPSED TIME: 901.18 s
----------------------------------------------------------------------------------------------
{code}
in logs it's like:
{code}
2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}]
|impl.ShuffleManager|: Reducer_3: numInputs=20,
compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10,
ifileBufferSize=4096, ifileReadAheadEnabled=true, ifileReadAheadLength=4194304,
localDiskFetchEnabled=true, sharedFetchEnabled=false, keepAlive=true,
keepAliveMaxConnections=20, connectionTimeout=180000, readTimeout=180000,
bufferSize=8192, bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false
...
receives the input event:
2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|:
Routing events from heartbeat response to task,
currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1
fromEventId=0 nextFromEventId=0
...but then it hangs while waiting for further inputs:
"TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581
waiting on condition [0x00007f3f737ba000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000071ad90a00> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
at
java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
at
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033)
at
org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
at
org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
at
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129)
at
org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385)
at
org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454)
at
org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241)
at
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555)
at
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571)
at
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252)
at
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
at
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
at
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
at
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
at
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
at
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}
> Bucket Map Join can hang if the source tasks parallelism is changed by
> reducer autoparallelism
> ----------------------------------------------------------------------------------------------
>
> Key: HIVE-27078
> URL: https://issues.apache.org/jira/browse/HIVE-27078
> Project: Hive
> Issue Type: Bug
> Reporter: László Bodor
> Priority: Major
>
> Considering this DAG:
> {code}
> | Map 1 <- Reducer 3 (CUSTOM_EDGE) |
> | Map 2 <- Map 4 (CUSTOM_EDGE) |
> | Map 5 <- Map 1 (CUSTOM_EDGE) |
> | Reducer 3 <- Map 2 (SIMPLE_EDGE)
> {code}
> this can be simplified further, just picked from a customer query, the
> problematic vertices and edge is:
> {code}
> | Map 1 <- Reducer 3 (CUSTOM_EDGE) |
> {code}
> Reducer 3 started scheduled with 20 tasks, and later it's decided by auto
> reducer parallelism that only 4 tasks are needed:
> {code}
> 2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4]
> |vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex:
> Reducer 3 from 20 to 4
> {code}
> in this case, Map 1 can hang as it still expects 20 inputs:
> {code}
> ----------------------------------------------------------------------------------------------
> VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
> FAILED KILLED
> ----------------------------------------------------------------------------------------------
> Map 4 .......... container SUCCEEDED 16 16 0 0
> 0 0
> Map 2 .......... container SUCCEEDED 48 48 0 0
> 0 0
> Reducer 3 ...... container SUCCEEDED 4 4 0 0
> 0 0
> Map 1 container RUNNING 192 0 13 179
> 0 0
> Map 5 container INITED 241 0 0 241
> 0 0
> ----------------------------------------------------------------------------------------------
> VERTICES: 03/05 [===>>-----------------------] 13% ELAPSED TIME: 901.18 s
> ----------------------------------------------------------------------------------------------
> {code}
> in logs it's like:
> {code}
> 2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}]
> |impl.ShuffleManager|: Reducer_3: numInputs=20,
> compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10,
> ifileBufferSize=4096, ifileReadAheadEnabled=true,
> ifileReadAheadLength=4194304, localDiskFetchEnabled=true,
> sharedFetchEnabled=false, keepAlive=true, keepAliveMaxConnections=20,
> connectionTimeout=180000, readTimeout=180000, bufferSize=8192,
> bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false
> ...
> receives the input event:
> 2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|:
> Routing events from heartbeat response to task,
> currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1
> fromEventId=0 nextFromEventId=0
> ...but then it hangs while waiting for further inputs:
> "TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581
> waiting on condition [0x00007f3f737ba000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000071ad90a00> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at
> java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
> at
> java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
> at
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033)
> at
> org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
> at
> org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
> at
> org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129)
> at
> org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385)
> at
> org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454)
> at
> org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241)
> at
> org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555)
> at
> org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111)
> at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374)
> at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571)
> at
> org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523)
> at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384)
> at
> org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
> at
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268)
> at
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252)
> at
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
> at
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
> at
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> at
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
> at
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
> at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
> at
> com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
> at
> com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}
> we can temporarily (as a quick workaround) disable auto reducer parallelism
> if on a vertex if it's a source of a bucket map join
--
This message was sent by Atlassian Jira
(v8.20.10#820010)