[ https://issues.apache.org/jira/browse/FLINK-11783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-11783: ----------------------------------- Labels: auto-deprioritized-major (was: stale-major) > Deadlock during Join operation > ------------------------------ > > Key: FLINK-11783 > URL: https://issues.apache.org/jira/browse/FLINK-11783 > Project: Flink > Issue Type: Bug > Components: API / DataSet > Affects Versions: 1.7.2 > Reporter: Julien Nioche > Priority: Major > Labels: auto-deprioritized-major > Attachments: flink_is_stuck.png > > > I am running a filtering job on a large dataset with Flink running in > distributed mode. Most tasks in the Join operation have completed a while ago > and only the tasks from a particular TaskManager are still running. These > tasks make progress but extremely slowly. > When logging onto the machine running this TM I can see that all threads are > TIMED_WAITING . > Could there be a synchronization problem? > See attachment for a screenshot of the Flink UI and the stack below. > > *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}* > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > <init>(DataSetFilterJob.java:67)) (66/150)" #155 prio=5 os_prio=0 > tid=0x00007faa5c01c000 nid=0x248c waiting on condition [0x00007fa9d15d5000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x00000007bfa89578> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > <init>(DataSetFilterJob.java:67)) (65/150)" #154 prio=5 os_prio=0 > tid=0x00007faa5c01b000 nid=0x248b waiting on condition [0x00007fa9d14d4000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x00000007b8e0eb50> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > <init>(DataSetFilterJob.java:67)) (68/150)" #153 prio=5 os_prio=0 > tid=0x00007faa5c019800 nid=0x248a waiting on condition [0x00007fa981df6000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0000000774903a00> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView.nextSegment(HeaderlessChannelReaderInputView.java:90)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:769)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > <init>(DataSetFilterJob.java:67)) (67/150)" #152 prio=5 os_prio=0 > tid=0x00007faa5c018800 nid=0x2489 waiting on condition [0x00007fa98c194000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x00000007b08a0508> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > <init>(DataSetFilterJob.java:67)) (62/150)" #151 prio=5 os_prio=0 > tid=0x00007faa5c017800 nid=0x2488 in Object.wait() [0x00007fa98c295000]}} > {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}} > {{ at java.lang.Object.wait(Native Method)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:127)}} > {{ - locked <0x00000007ab60d660> (a java.lang.Object)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:159)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:877)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)}} > {{ at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116)}} > {{ at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)}} > {{ at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)}} > {{ at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)}} > {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)}} > {{ at java.lang.Thread.run(Thread.java:748)}}{{"CHAIN Join (Join at > with(JoinOperator.java:543)) -> Map (Map at <init>(DataSetFilterJob.java:67)) > (64/150)" #150 prio=5 os_prio=0 tid=0x00007faa5c016800 nid=0x2487 in > Object.wait() [0x00007fa981ef7000]}} > {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}} > {{ at java.lang.Object.wait(Native Method)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:127)}} > {{ - locked <0x00000007ab60d650> (a java.lang.Object)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:159)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:877)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)}} > {{ at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116)}} > {{ at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)}} > {{ at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)}} > {{ at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)}} > {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)}} > {{ at java.lang.Thread.run(Thread.java:748)}}{{"CHAIN Join (Join at > with(JoinOperator.java:543)) -> Map (Map at <init>(DataSetFilterJob.java:67)) > (63/150)" #149 prio=5 os_prio=0 tid=0x00007faa5c015800 nid=0x2486 waiting on > condition [0x00007fa9d0fcf000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x000000077439ea00> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView.nextSegment(HeaderlessChannelReaderInputView.java:90)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:769)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > <init>(DataSetFilterJob.java:67)) (61/150)" #148 prio=5 os_prio=0 > tid=0x00007faa5c014800 nid=0x2485 in Object.wait() [0x00007fa9d2cf1000]}} > {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}} > {{ at java.lang.Object.wait(Native Method)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:127)}} > {{ - locked <0x00000007ab60d640> (a java.lang.Object)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:159)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:877)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)}} > {{ at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)}} > {{ at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116)}} > {{ at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)}} > {{ at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)}} > {{ at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)}} > {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)}} > {{ at java.lang.Thread.run(Thread.java:748)}} > -- This message was sent by Atlassian Jira (v8.3.4#803005)