[ 
https://issues.apache.org/jira/browse/FLINK-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14627737#comment-14627737
 ] 

William Saar commented on FLINK-2355:
-------------------------------------

Thanks, just wanted to check if this was something simple/obvious. 

I am experimenting with some global windowing magic in the first flatMap 
operation where I am buffering some input, blocking the operation, and 
occasionally feeding data to the collector so it is not exactly the recommended 
way to operate. However, it seemed to work better with my week-old code before 
I updated the code from the repo yesterday so was thinking that it may be some 
recent locking problem.

I am talking privately with other Flink committers and may coordinate digging 
with them.

> Job hanging in collector, waiting for request buffer
> ----------------------------------------------------
>
>                 Key: FLINK-2355
>                 URL: https://issues.apache.org/jira/browse/FLINK-2355
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: master
>            Reporter: William Saar
>
> Running locally on a machine with 8 threads.
> Daemon Thread [Flat Map -> (Filter, Filter -> Flat Map -> Filter -> (Stream 
> Sink, Stream Sink)) (6/8)] (Suspended)    
>       owns: SpanningRecordSerializer<T>  (id=533)     
>               waited by: Daemon Thread [Thread-173] (Suspended)       
>       waiting for: ArrayDeque<E>  (id=534)    
>       Object.wait(long) line: not available [native method]   
>       LocalBufferPool.requestBuffer(boolean) line: 163        
>       LocalBufferPool.requestBufferBlocking() line: 133       
>       StreamRecordWriter<T>(RecordWriter<T>).emit(T) line: 92 
>       StreamRecordWriter<T>.emit(T) line: 58  
>       StreamOutput<OUT>.collect(OUT) line: 62 
>       CollectorWrapper<OUT>.collect(OUT) line: 40     
>       StreamFilter<IN>.processElement(IN) line: 34    
>       OutputHandler$CopyingOperatorCollector<T>.collect(T) line: 278  
>       CollectorWrapper<OUT>.collect(OUT) line: 40     
>       IteratedDataModelOp<I,O>.lambda$0(Collector, InternalMessage) line: 102 
>       437981089.accept(Object) line: not available    
>       ArrayList<E>.forEach(Consumer<? super E>) line: not available   
>       IteratedDataModelOp<I,O>.processInput(I, 
> Collector<MessageWrapper<I,O>>) line: 99       
>       IteratedDataModelOp<I,O>.flatMap(MessageWrapper<I,O>, 
> Collector<MessageWrapper<I,O>>) line: 70  
>       IteratedDataModelOp<I,O>.flatMap(Object, Collector) line: 1     
>       StreamFlatMap<IN,OUT>.processElement(IN) line: 35       
>       OneInputStreamTask<IN,OUT>.invoke() line: 103   
>       Task.run() line: 567    
>       Thread.run() line: not available        
>     
> Daemon Thread [Thread-173] (Suspended)        
>       waiting for: SpanningRecordSerializer<T>  (id=533)      
>               owned by: Daemon Thread [Flat Map -> (Filter, Filter -> Flat 
> Map -> Filter -> (Stream Sink, Stream Sink)) (6/8)] (Suspended)    
>                       waiting for: ArrayDeque<E>  (id=534)    
>       StreamRecordWriter<T>(RecordWriter<T>).flush() line: 149        
>       StreamRecordWriter$OutputFlusher.run() line: 90 
>     



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to