[ 
https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-2685:
-------------------------------
    Component/s: Network

> TaskManager deadlock on NetworkBufferPool
> -----------------------------------------
>
>                 Key: FLINK-2685
>                 URL: https://issues.apache.org/jira/browse/FLINK-2685
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, Network
>    Affects Versions: 0.10.0
>            Reporter: Greg Hogan
>            Assignee: Ufuk Celebi
>            Priority: Major
>         Attachments: job_manager_19_feb_15_30_running, 
> task_manager_19_feb_15_30_running
>
>
> This deadlock occurs intermittently. I have a {{join}} followed by a 
> {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local 
> variables from one each of the {{join}} threads below.
> The {{join}}'s are waiting on a buffer to become available 
> ({{networkBufferPool.availableMemorySegments.count=0}}). Both 
> {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 > 
> numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity 
> ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second 
> {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
> {{LocalBufferPool.returnExcessMemorySegments}} only recycles 
> {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested 
> {{Buffer}}'s will only be released when explicitly recycled.
> First join stack trace and variable values from 
> {{LocalBufferPool.requestBuffer}}:
> {noformat}
> owns: SpanningRecordSerializer<T>  (id=723)   
> waiting for: ArrayDeque<E>  (id=724)  
> Object.wait(long) line: not available [native method] 
> LocalBufferPool.requestBuffer(boolean) line: 163      
> LocalBufferPool.requestBufferBlocking() line: 133     
> RecordWriter<T>.emit(T) line: 92      
> OutputCollector<T>.collect(T) line: 65        
> JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) 
> line: 1088   
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
>  Collector<O>) line: 137       
> JoinDriver<IT1,IT2,OT>.run() line: 208        
> RegularPactTask<S,OT>.run() line: 489 
> RegularPactTask<S,OT>.invoke() line: 354      
> Task.run() line: 581  
> Thread.run() line: 745        
> {noformat}
> {noformat}
> this  LocalBufferPool  (id=403)       
>       availableMemorySegments ArrayDeque<E>  (id=398) 
>               elements        Object[16]  (id=422)    
>               head    14      
>               tail    14      
>       currentPoolSize 60      
>       isDestroyed     false   
>       networkBufferPool       NetworkBufferPool  (id=354)     
>               allBufferPools  HashSet<E>  (id=424)    
>               availableMemorySegments ArrayBlockingQueue<E>  (id=427) 
>                       count   0       
>                       items   Object[10240]  (id=674) 
>                       itrs    null    
>                       lock    ReentrantLock  (id=675) 
>                       notEmpty        
> AbstractQueuedSynchronizer$ConditionObject  (id=678)    
>                       notFull AbstractQueuedSynchronizer$ConditionObject  
> (id=679)    
>                       putIndex        6954    
>                       takeIndex       6954    
>               factoryLock     Object  (id=430)        
>               isDestroyed     false   
>               managedBufferPools      HashSet<E>  (id=431)    
>               memorySegmentSize       32768   
>               numTotalRequiredBuffers 3226    
>               totalNumberOfMemorySegments     10240   
>       numberOfRequestedMemorySegments 60      
>       numberOfRequiredMemorySegments  32      
>       owner   null    
>       registeredListeners     ArrayDeque<E>  (id=421) 
>               elements        Object[16]  (id=685)    
>               head    0       
>               tail    0       
> askToRecycle  false   
> isBlocking    true    
> {noformat}
> Second join stack trace and variable values from 
> {{SingleInputGate.getNextBufferOrEvent}}:
> {noformat}
> Unsafe.park(boolean, long) line: not available [native method]        
> LockSupport.parkNanos(Object, long) line: 215 
> AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078        
> LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467 
> SingleInputGate.getNextBufferOrEvent() line: 414      
> MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79     
> MutableRecordReader<T>.next(T) line: 34       
> ReaderIterator<T>.next(T) line: 59    
> MutableHashTable$ProbeIterator<PT>.next() line: 1581  
> MutableHashTable<BT,PT>.processProbeIter() line: 457  
> MutableHashTable<BT,PT>.nextRecord() line: 555        
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
>  Collector<O>) line: 110       
> JoinDriver<IT1,IT2,OT>.run() line: 208        
> RegularPactTask<S,OT>.run() line: 489 
> RegularPactTask<S,OT>.invoke() line: 354      
> Task.run() line: 581  
> Thread.run() line: 745        
> {noformat}
> {noformat}
> this  SingleInputGate  (id=693)       
>       bufferPool      LocalBufferPool  (id=706)       
>               availableMemorySegments ArrayDeque<E>  (id=716) 
>                       elements        Object[16]  (id=717)    
>                       head    0       
>                       tail    0       
>               currentPoolSize 60      
>               isDestroyed     false   
>               networkBufferPool       NetworkBufferPool  (id=354)     
>                       allBufferPools  HashSet<E>  (id=424)    
>                       availableMemorySegments ArrayBlockingQueue<E>  (id=427) 
>                               count   0       
>                               items   Object[10240]  (id=674) 
>                               itrs    null    
>                               lock    ReentrantLock  (id=675) 
>                               notEmpty        
> AbstractQueuedSynchronizer$ConditionObject  (id=678)    
>                               notFull 
> AbstractQueuedSynchronizer$ConditionObject  (id=679)    
>                               putIndex        6954    
>                               takeIndex       6954    
>                       factoryLock     Object  (id=430)        
>                       isDestroyed     false   
>                       managedBufferPools      HashSet<E>  (id=431)    
>                       memorySegmentSize       32768   
>                       numTotalRequiredBuffers 3226    
>                       totalNumberOfMemorySegments     10240   
>               numberOfRequestedMemorySegments 0       
>               numberOfRequiredMemorySegments  32      
>               owner   null    
>               registeredListeners     ArrayDeque<E>  (id=718) 
>       channelsWithEndOfPartitionEvents        BitSet  (id=707)        
>       consumedResultId        IntermediateDataSetID  (id=708) 
>       consumedSubpartitionIndex       24      
>       executionId     ExecutionAttemptID  (id=709)    
>       hasReceivedAllEndOfPartitionEvents      false   
>       inputChannels   HashMap<K,V>  (id=710)  
>       inputChannelsWithData   LinkedBlockingQueue<E>  (id=692)        
>               capacity        2147483647      
>               count   AtomicInteger  (id=698) 
>                       value   0       
>               head    LinkedBlockingQueue$Node<E>  (id=701)   
>               last    LinkedBlockingQueue$Node<E>  (id=701)   
>               notEmpty        AbstractQueuedSynchronizer$ConditionObject  
> (id=691)    
>               notFull AbstractQueuedSynchronizer$ConditionObject  (id=703)    
>               putLock ReentrantLock  (id=704) 
>               takeLock        ReentrantLock  (id=705) 
>       isReleased      false   
>       jobId   JobID  (id=711) 
>       numberOfInputChannels   32      
>       numberOfUninitializedChannels   0       
>       owningTaskName  "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" 
> (id=712)      
>       partitionStateChecker   
> NetworkEnvironment$JobManagerPartitionStateChecker  (id=363)    
>       pendingEvents   ArrayList<E>  (id=713)  
>       registeredListeners     CopyOnWriteArrayList<E>  (id=714)       
>       requestedPartitionsFlag true    
>       requestLock     Object  (id=715)        
>       retriggerLocalRequestTimer      null    
> currentChannel        null    
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to