[
https://issues.apache.org/jira/browse/FLINK-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14627750#comment-14627750
]
Ufuk Celebi commented on FLINK-2355:
------------------------------------
OK. There can be an issue if you are not consuming the input in downstream
operators. Is this something you are doing? It's blocked at the end of the
pipeline after the flatmap (with ID 17; the one going to two filters). If for
example one of the down stream filter operations (for example with ID 18) is
not consuming the input, it can easily happen that the pipeline blocks.
Is your global windowing magic ;) working for time based windows? If yes, there
is good news: some committers are working on making this work out of the box
(so no magic will be necessary soon).
> Job hanging in collector, waiting for request buffer
> ----------------------------------------------------
>
> Key: FLINK-2355
> URL: https://issues.apache.org/jira/browse/FLINK-2355
> Project: Flink
> Issue Type: Bug
> Affects Versions: master
> Reporter: William Saar
>
> Running locally on a machine with 8 threads.
> Daemon Thread [Flat Map -> (Filter, Filter -> Flat Map -> Filter -> (Stream
> Sink, Stream Sink)) (6/8)] (Suspended)
> owns: SpanningRecordSerializer<T> (id=533)
> waited by: Daemon Thread [Thread-173] (Suspended)
> waiting for: ArrayDeque<E> (id=534)
> Object.wait(long) line: not available [native method]
> LocalBufferPool.requestBuffer(boolean) line: 163
> LocalBufferPool.requestBufferBlocking() line: 133
> StreamRecordWriter<T>(RecordWriter<T>).emit(T) line: 92
> StreamRecordWriter<T>.emit(T) line: 58
> StreamOutput<OUT>.collect(OUT) line: 62
> CollectorWrapper<OUT>.collect(OUT) line: 40
> StreamFilter<IN>.processElement(IN) line: 34
> OutputHandler$CopyingOperatorCollector<T>.collect(T) line: 278
> CollectorWrapper<OUT>.collect(OUT) line: 40
> IteratedDataModelOp<I,O>.lambda$0(Collector, InternalMessage) line: 102
> 437981089.accept(Object) line: not available
> ArrayList<E>.forEach(Consumer<? super E>) line: not available
> IteratedDataModelOp<I,O>.processInput(I,
> Collector<MessageWrapper<I,O>>) line: 99
> IteratedDataModelOp<I,O>.flatMap(MessageWrapper<I,O>,
> Collector<MessageWrapper<I,O>>) line: 70
> IteratedDataModelOp<I,O>.flatMap(Object, Collector) line: 1
> StreamFlatMap<IN,OUT>.processElement(IN) line: 35
> OneInputStreamTask<IN,OUT>.invoke() line: 103
> Task.run() line: 567
> Thread.run() line: not available
>
> Daemon Thread [Thread-173] (Suspended)
> waiting for: SpanningRecordSerializer<T> (id=533)
> owned by: Daemon Thread [Flat Map -> (Filter, Filter -> Flat
> Map -> Filter -> (Stream Sink, Stream Sink)) (6/8)] (Suspended)
> waiting for: ArrayDeque<E> (id=534)
> StreamRecordWriter<T>(RecordWriter<T>).flush() line: 149
> StreamRecordWriter$OutputFlusher.run() line: 90
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)