[
https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423717#comment-16423717
]
Amit Jain edited comment on FLINK-2685 at 4/3/18 9:35 AM:
----------------------------------------------------------
[~uce] [~StephanEwen] We are also facing the same issue on version 1.3.2. Could
you help us through this?
We are also using JOIN operator (FullOuterJoin), can observe jobs running but
not progressing at all.
I've attached jstack of job manager and task manager also
(job_manager_19_feb_15_30_running and task_manager_19_feb_15_30_running).
was (Author: amit.jain):
[~uce] [~StephanEwen] We are also facing the same issue on version 1.3.2. Could
you help us through this?
> TaskManager deadlock on NetworkBufferPool
> -----------------------------------------
>
> Key: FLINK-2685
> URL: https://issues.apache.org/jira/browse/FLINK-2685
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 0.10.0
> Reporter: Greg Hogan
> Assignee: Ufuk Celebi
> Priority: Major
> Attachments: job_manager_19_feb_15_30_running,
> task_manager_19_feb_15_30_running
>
>
> This deadlock occurs intermittently. I have a {{join}} followed by a
> {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local
> variables from one each of the {{join}} threads below.
> The {{join}}'s are waiting on a buffer to become available
> ({{networkBufferPool.availableMemorySegments.count=0}}). Both
> {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 >
> numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity
> ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second
> {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
> {{LocalBufferPool.returnExcessMemorySegments}} only recycles
> {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested
> {{Buffer}}'s will only be released when explicitly recycled.
> First join stack trace and variable values from
> {{LocalBufferPool.requestBuffer}}:
> {noformat}
> owns: SpanningRecordSerializer<T> (id=723)
> waiting for: ArrayDeque<E> (id=724)
> Object.wait(long) line: not available [native method]
> LocalBufferPool.requestBuffer(boolean) line: 163
> LocalBufferPool.requestBufferBlocking() line: 133
> RecordWriter<T>.emit(T) line: 92
> OutputCollector<T>.collect(T) line: 65
> JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>)
> line: 1088
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
> Collector<O>) line: 137
> JoinDriver<IT1,IT2,OT>.run() line: 208
> RegularPactTask<S,OT>.run() line: 489
> RegularPactTask<S,OT>.invoke() line: 354
> Task.run() line: 581
> Thread.run() line: 745
> {noformat}
> {noformat}
> this LocalBufferPool (id=403)
> availableMemorySegments ArrayDeque<E> (id=398)
> elements Object[16] (id=422)
> head 14
> tail 14
> currentPoolSize 60
> isDestroyed false
> networkBufferPool NetworkBufferPool (id=354)
> allBufferPools HashSet<E> (id=424)
> availableMemorySegments ArrayBlockingQueue<E> (id=427)
> count 0
> items Object[10240] (id=674)
> itrs null
> lock ReentrantLock (id=675)
> notEmpty
> AbstractQueuedSynchronizer$ConditionObject (id=678)
> notFull AbstractQueuedSynchronizer$ConditionObject
> (id=679)
> putIndex 6954
> takeIndex 6954
> factoryLock Object (id=430)
> isDestroyed false
> managedBufferPools HashSet<E> (id=431)
> memorySegmentSize 32768
> numTotalRequiredBuffers 3226
> totalNumberOfMemorySegments 10240
> numberOfRequestedMemorySegments 60
> numberOfRequiredMemorySegments 32
> owner null
> registeredListeners ArrayDeque<E> (id=421)
> elements Object[16] (id=685)
> head 0
> tail 0
> askToRecycle false
> isBlocking true
> {noformat}
> Second join stack trace and variable values from
> {{SingleInputGate.getNextBufferOrEvent}}:
> {noformat}
> Unsafe.park(boolean, long) line: not available [native method]
> LockSupport.parkNanos(Object, long) line: 215
> AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078
> LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467
> SingleInputGate.getNextBufferOrEvent() line: 414
> MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79
> MutableRecordReader<T>.next(T) line: 34
> ReaderIterator<T>.next(T) line: 59
> MutableHashTable$ProbeIterator<PT>.next() line: 1581
> MutableHashTable<BT,PT>.processProbeIter() line: 457
> MutableHashTable<BT,PT>.nextRecord() line: 555
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
> Collector<O>) line: 110
> JoinDriver<IT1,IT2,OT>.run() line: 208
> RegularPactTask<S,OT>.run() line: 489
> RegularPactTask<S,OT>.invoke() line: 354
> Task.run() line: 581
> Thread.run() line: 745
> {noformat}
> {noformat}
> this SingleInputGate (id=693)
> bufferPool LocalBufferPool (id=706)
> availableMemorySegments ArrayDeque<E> (id=716)
> elements Object[16] (id=717)
> head 0
> tail 0
> currentPoolSize 60
> isDestroyed false
> networkBufferPool NetworkBufferPool (id=354)
> allBufferPools HashSet<E> (id=424)
> availableMemorySegments ArrayBlockingQueue<E> (id=427)
> count 0
> items Object[10240] (id=674)
> itrs null
> lock ReentrantLock (id=675)
> notEmpty
> AbstractQueuedSynchronizer$ConditionObject (id=678)
> notFull
> AbstractQueuedSynchronizer$ConditionObject (id=679)
> putIndex 6954
> takeIndex 6954
> factoryLock Object (id=430)
> isDestroyed false
> managedBufferPools HashSet<E> (id=431)
> memorySegmentSize 32768
> numTotalRequiredBuffers 3226
> totalNumberOfMemorySegments 10240
> numberOfRequestedMemorySegments 0
> numberOfRequiredMemorySegments 32
> owner null
> registeredListeners ArrayDeque<E> (id=718)
> channelsWithEndOfPartitionEvents BitSet (id=707)
> consumedResultId IntermediateDataSetID (id=708)
> consumedSubpartitionIndex 24
> executionId ExecutionAttemptID (id=709)
> hasReceivedAllEndOfPartitionEvents false
> inputChannels HashMap<K,V> (id=710)
> inputChannelsWithData LinkedBlockingQueue<E> (id=692)
> capacity 2147483647
> count AtomicInteger (id=698)
> value 0
> head LinkedBlockingQueue$Node<E> (id=701)
> last LinkedBlockingQueue$Node<E> (id=701)
> notEmpty AbstractQueuedSynchronizer$ConditionObject
> (id=691)
> notFull AbstractQueuedSynchronizer$ConditionObject (id=703)
> putLock ReentrantLock (id=704)
> takeLock ReentrantLock (id=705)
> isReleased false
> jobId JobID (id=711)
> numberOfInputChannels 32
> numberOfUninitializedChannels 0
> owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)"
> (id=712)
> partitionStateChecker
> NetworkEnvironment$JobManagerPartitionStateChecker (id=363)
> pendingEvents ArrayList<E> (id=713)
> registeredListeners CopyOnWriteArrayList<E> (id=714)
> requestedPartitionsFlag true
> requestLock Object (id=715)
> retriggerLocalRequestTimer null
> currentChannel null
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)