[ 
https://issues.apache.org/jira/browse/FLINK-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14627695#comment-14627695
 ] 

William Saar commented on FLINK-2355:
-------------------------------------

No Co* tasks. Here is the plan
{"nodes":[{"id":1,"type":"Read Text File Source","pact":"Data 
Source","contents":"Read Text File 
Source","parallelism":8},{"id":2,"type":"Map","pact":"Data 
Stream","contents":"Map","parallelism":8,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":3,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":2,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Flat
 Map","pact":"Data Stream","contents":"Flat 
Map","parallelism":8,"predecessors":[{"id":3,"ship_strategy":"FORWARD","side":"second"}]},{"id":5,"type":"Flat
 Map","pact":"Data Stream","contents":"Flat 
Map","parallelism":8,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]},{"id":6,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]},{"id":7,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]},{"id":8,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]},{"id":9,"type":"Flat
 Map","pact":"Data Stream","contents":"Flat 
Map","parallelism":8,"predecessors":[{"id":6,"ship_strategy":"GROUPBY","side":"second"},{"id":7,"ship_strategy":"BROADCAST","side":"second"},{"id":8,"ship_strategy":"FORWARD","side":"second"}]},{"id":10,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":9,"ship_strategy":"FORWARD","side":"second"}]},{"id":11,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":9,"ship_strategy":"FORWARD","side":"second"}]},{"id":12,"type":"Flat
 Map","pact":"Data Stream","contents":"Flat 
Map","parallelism":8,"predecessors":[{"id":11,"ship_strategy":"FORWARD","side":"second"}]},{"id":13,"type":"Flat
 Map","pact":"Data Stream","contents":"Flat 
Map","parallelism":8,"predecessors":[{"id":12,"ship_strategy":"FORWARD","side":"second"}]},{"id":14,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":13,"ship_strategy":"FORWARD","side":"second"}]},{"id":15,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":13,"ship_strategy":"FORWARD","side":"second"}]},{"id":16,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":13,"ship_strategy":"FORWARD","side":"second"}]},{"id":17,"type":"Flat
 Map","pact":"Data Stream","contents":"Flat 
Map","parallelism":8,"predecessors":[{"id":14,"ship_strategy":"GROUPBY","side":"second"},{"id":15,"ship_strategy":"BROADCAST","side":"second"},{"id":16,"ship_strategy":"FORWARD","side":"second"}]},{"id":18,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":17,"ship_strategy":"FORWARD","side":"second"}]},{"id":19,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":17,"ship_strategy":"FORWARD","side":"second"}]},{"id":20,"type":"Flat
 Map","pact":"Data Stream","contents":"Flat 
Map","parallelism":8,"predecessors":[{"id":19,"ship_strategy":"FORWARD","side":"second"}]},{"id":21,"type":"Filter","pact":"Data
 
Stream","contents":"Filter","parallelism":8,"predecessors":[{"id":20,"ship_strategy":"FORWARD","side":"second"}]},{"id":22,"type":"Stream
 Sink","pact":"Data Stream","contents":"Stream 
Sink","parallelism":8,"predecessors":[{"id":21,"ship_strategy":"FORWARD","side":"second"}]},{"id":23,"type":"Stream
 Sink","pact":"Data Stream","contents":"Stream 
Sink","parallelism":8,"predecessors":[{"id":21,"ship_strategy":"FORWARD","side":"second"}]}]}

> Job hanging in collector, waiting for request buffer
> ----------------------------------------------------
>
>                 Key: FLINK-2355
>                 URL: https://issues.apache.org/jira/browse/FLINK-2355
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: master
>            Reporter: William Saar
>
> Running locally on a machine with 8 threads.
> Daemon Thread [Flat Map -> (Filter, Filter -> Flat Map -> Filter -> (Stream 
> Sink, Stream Sink)) (6/8)] (Suspended)    
>       owns: SpanningRecordSerializer<T>  (id=533)     
>               waited by: Daemon Thread [Thread-173] (Suspended)       
>       waiting for: ArrayDeque<E>  (id=534)    
>       Object.wait(long) line: not available [native method]   
>       LocalBufferPool.requestBuffer(boolean) line: 163        
>       LocalBufferPool.requestBufferBlocking() line: 133       
>       StreamRecordWriter<T>(RecordWriter<T>).emit(T) line: 92 
>       StreamRecordWriter<T>.emit(T) line: 58  
>       StreamOutput<OUT>.collect(OUT) line: 62 
>       CollectorWrapper<OUT>.collect(OUT) line: 40     
>       StreamFilter<IN>.processElement(IN) line: 34    
>       OutputHandler$CopyingOperatorCollector<T>.collect(T) line: 278  
>       CollectorWrapper<OUT>.collect(OUT) line: 40     
>       IteratedDataModelOp<I,O>.lambda$0(Collector, InternalMessage) line: 102 
>       437981089.accept(Object) line: not available    
>       ArrayList<E>.forEach(Consumer<? super E>) line: not available   
>       IteratedDataModelOp<I,O>.processInput(I, 
> Collector<MessageWrapper<I,O>>) line: 99       
>       IteratedDataModelOp<I,O>.flatMap(MessageWrapper<I,O>, 
> Collector<MessageWrapper<I,O>>) line: 70  
>       IteratedDataModelOp<I,O>.flatMap(Object, Collector) line: 1     
>       StreamFlatMap<IN,OUT>.processElement(IN) line: 35       
>       OneInputStreamTask<IN,OUT>.invoke() line: 103   
>       Task.run() line: 567    
>       Thread.run() line: not available        
>     
> Daemon Thread [Thread-173] (Suspended)        
>       waiting for: SpanningRecordSerializer<T>  (id=533)      
>               owned by: Daemon Thread [Flat Map -> (Filter, Filter -> Flat 
> Map -> Filter -> (Stream Sink, Stream Sink)) (6/8)] (Suspended)    
>                       waiting for: ArrayDeque<E>  (id=534)    
>       StreamRecordWriter<T>(RecordWriter<T>).flush() line: 149        
>       StreamRecordWriter$OutputFlusher.run() line: 90 
>     



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to