[ 
https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444342#comment-16444342
 ] 

Amit Jain commented on FLINK-2685:
----------------------------------

[~NicoK] I've checked in WebUI and found there is no progress at all. 

This issue is coming up randomly, we have also observed that there are cases 
where few jobs hardly need to work with few MB of data and still hung up.
 

> TaskManager deadlock on NetworkBufferPool
> -----------------------------------------
>
>                 Key: FLINK-2685
>                 URL: https://issues.apache.org/jira/browse/FLINK-2685
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, Network
>    Affects Versions: 0.10.0
>            Reporter: Greg Hogan
>            Assignee: Ufuk Celebi
>            Priority: Major
>         Attachments: job_manager_19_feb_15_30_running, 
> task_manager_19_feb_15_30_running
>
>
> This deadlock occurs intermittently. I have a {{join}} followed by a 
> {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local 
> variables from one each of the {{join}} threads below.
> The {{join}}'s are waiting on a buffer to become available 
> ({{networkBufferPool.availableMemorySegments.count=0}}). Both 
> {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 > 
> numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity 
> ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second 
> {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
> {{LocalBufferPool.returnExcessMemorySegments}} only recycles 
> {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested 
> {{Buffer}}'s will only be released when explicitly recycled.
> First join stack trace and variable values from 
> {{LocalBufferPool.requestBuffer}}:
> {noformat}
> owns: SpanningRecordSerializer<T>  (id=723)   
> waiting for: ArrayDeque<E>  (id=724)  
> Object.wait(long) line: not available [native method] 
> LocalBufferPool.requestBuffer(boolean) line: 163      
> LocalBufferPool.requestBufferBlocking() line: 133     
> RecordWriter<T>.emit(T) line: 92      
> OutputCollector<T>.collect(T) line: 65        
> JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) 
> line: 1088   
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
>  Collector<O>) line: 137       
> JoinDriver<IT1,IT2,OT>.run() line: 208        
> RegularPactTask<S,OT>.run() line: 489 
> RegularPactTask<S,OT>.invoke() line: 354      
> Task.run() line: 581  
> Thread.run() line: 745        
> {noformat}
> {noformat}
> this  LocalBufferPool  (id=403)       
>       availableMemorySegments ArrayDeque<E>  (id=398) 
>               elements        Object[16]  (id=422)    
>               head    14      
>               tail    14      
>       currentPoolSize 60      
>       isDestroyed     false   
>       networkBufferPool       NetworkBufferPool  (id=354)     
>               allBufferPools  HashSet<E>  (id=424)    
>               availableMemorySegments ArrayBlockingQueue<E>  (id=427) 
>                       count   0       
>                       items   Object[10240]  (id=674) 
>                       itrs    null    
>                       lock    ReentrantLock  (id=675) 
>                       notEmpty        
> AbstractQueuedSynchronizer$ConditionObject  (id=678)    
>                       notFull AbstractQueuedSynchronizer$ConditionObject  
> (id=679)    
>                       putIndex        6954    
>                       takeIndex       6954    
>               factoryLock     Object  (id=430)        
>               isDestroyed     false   
>               managedBufferPools      HashSet<E>  (id=431)    
>               memorySegmentSize       32768   
>               numTotalRequiredBuffers 3226    
>               totalNumberOfMemorySegments     10240   
>       numberOfRequestedMemorySegments 60      
>       numberOfRequiredMemorySegments  32      
>       owner   null    
>       registeredListeners     ArrayDeque<E>  (id=421) 
>               elements        Object[16]  (id=685)    
>               head    0       
>               tail    0       
> askToRecycle  false   
> isBlocking    true    
> {noformat}
> Second join stack trace and variable values from 
> {{SingleInputGate.getNextBufferOrEvent}}:
> {noformat}
> Unsafe.park(boolean, long) line: not available [native method]        
> LockSupport.parkNanos(Object, long) line: 215 
> AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078        
> LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467 
> SingleInputGate.getNextBufferOrEvent() line: 414      
> MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79     
> MutableRecordReader<T>.next(T) line: 34       
> ReaderIterator<T>.next(T) line: 59    
> MutableHashTable$ProbeIterator<PT>.next() line: 1581  
> MutableHashTable<BT,PT>.processProbeIter() line: 457  
> MutableHashTable<BT,PT>.nextRecord() line: 555        
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
>  Collector<O>) line: 110       
> JoinDriver<IT1,IT2,OT>.run() line: 208        
> RegularPactTask<S,OT>.run() line: 489 
> RegularPactTask<S,OT>.invoke() line: 354      
> Task.run() line: 581  
> Thread.run() line: 745        
> {noformat}
> {noformat}
> this  SingleInputGate  (id=693)       
>       bufferPool      LocalBufferPool  (id=706)       
>               availableMemorySegments ArrayDeque<E>  (id=716) 
>                       elements        Object[16]  (id=717)    
>                       head    0       
>                       tail    0       
>               currentPoolSize 60      
>               isDestroyed     false   
>               networkBufferPool       NetworkBufferPool  (id=354)     
>                       allBufferPools  HashSet<E>  (id=424)    
>                       availableMemorySegments ArrayBlockingQueue<E>  (id=427) 
>                               count   0       
>                               items   Object[10240]  (id=674) 
>                               itrs    null    
>                               lock    ReentrantLock  (id=675) 
>                               notEmpty        
> AbstractQueuedSynchronizer$ConditionObject  (id=678)    
>                               notFull 
> AbstractQueuedSynchronizer$ConditionObject  (id=679)    
>                               putIndex        6954    
>                               takeIndex       6954    
>                       factoryLock     Object  (id=430)        
>                       isDestroyed     false   
>                       managedBufferPools      HashSet<E>  (id=431)    
>                       memorySegmentSize       32768   
>                       numTotalRequiredBuffers 3226    
>                       totalNumberOfMemorySegments     10240   
>               numberOfRequestedMemorySegments 0       
>               numberOfRequiredMemorySegments  32      
>               owner   null    
>               registeredListeners     ArrayDeque<E>  (id=718) 
>       channelsWithEndOfPartitionEvents        BitSet  (id=707)        
>       consumedResultId        IntermediateDataSetID  (id=708) 
>       consumedSubpartitionIndex       24      
>       executionId     ExecutionAttemptID  (id=709)    
>       hasReceivedAllEndOfPartitionEvents      false   
>       inputChannels   HashMap<K,V>  (id=710)  
>       inputChannelsWithData   LinkedBlockingQueue<E>  (id=692)        
>               capacity        2147483647      
>               count   AtomicInteger  (id=698) 
>                       value   0       
>               head    LinkedBlockingQueue$Node<E>  (id=701)   
>               last    LinkedBlockingQueue$Node<E>  (id=701)   
>               notEmpty        AbstractQueuedSynchronizer$ConditionObject  
> (id=691)    
>               notFull AbstractQueuedSynchronizer$ConditionObject  (id=703)    
>               putLock ReentrantLock  (id=704) 
>               takeLock        ReentrantLock  (id=705) 
>       isReleased      false   
>       jobId   JobID  (id=711) 
>       numberOfInputChannels   32      
>       numberOfUninitializedChannels   0       
>       owningTaskName  "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" 
> (id=712)      
>       partitionStateChecker   
> NetworkEnvironment$JobManagerPartitionStateChecker  (id=363)    
>       pendingEvents   ArrayList<E>  (id=713)  
>       registeredListeners     CopyOnWriteArrayList<E>  (id=714)       
>       requestedPartitionsFlag true    
>       requestLock     Object  (id=715)        
>       retriggerLocalRequestTimer      null    
> currentChannel        null    
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to