[
https://issues.apache.org/jira/browse/FLINK-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14627820#comment-14627820
]
William Saar commented on FLINK-2355:
-------------------------------------
At the very end I am splitting the input into two streams and passing both to a
print() sink (tried moving it before the split operation and didn't work
either).
Replaced most of my filters with flatMap-operations that dropped everything not
matching the filter condition, and no luck there either.
> Job hanging in collector, waiting for request buffer
> ----------------------------------------------------
>
> Key: FLINK-2355
> URL: https://issues.apache.org/jira/browse/FLINK-2355
> Project: Flink
> Issue Type: Bug
> Affects Versions: master
> Reporter: William Saar
>
> Running locally on a machine with 8 threads.
> Daemon Thread [Flat Map -> (Filter, Filter -> Flat Map -> Filter -> (Stream
> Sink, Stream Sink)) (6/8)] (Suspended)
> owns: SpanningRecordSerializer<T> (id=533)
> waited by: Daemon Thread [Thread-173] (Suspended)
> waiting for: ArrayDeque<E> (id=534)
> Object.wait(long) line: not available [native method]
> LocalBufferPool.requestBuffer(boolean) line: 163
> LocalBufferPool.requestBufferBlocking() line: 133
> StreamRecordWriter<T>(RecordWriter<T>).emit(T) line: 92
> StreamRecordWriter<T>.emit(T) line: 58
> StreamOutput<OUT>.collect(OUT) line: 62
> CollectorWrapper<OUT>.collect(OUT) line: 40
> StreamFilter<IN>.processElement(IN) line: 34
> OutputHandler$CopyingOperatorCollector<T>.collect(T) line: 278
> CollectorWrapper<OUT>.collect(OUT) line: 40
> IteratedDataModelOp<I,O>.lambda$0(Collector, InternalMessage) line: 102
> 437981089.accept(Object) line: not available
> ArrayList<E>.forEach(Consumer<? super E>) line: not available
> IteratedDataModelOp<I,O>.processInput(I,
> Collector<MessageWrapper<I,O>>) line: 99
> IteratedDataModelOp<I,O>.flatMap(MessageWrapper<I,O>,
> Collector<MessageWrapper<I,O>>) line: 70
> IteratedDataModelOp<I,O>.flatMap(Object, Collector) line: 1
> StreamFlatMap<IN,OUT>.processElement(IN) line: 35
> OneInputStreamTask<IN,OUT>.invoke() line: 103
> Task.run() line: 567
> Thread.run() line: not available
>
> Daemon Thread [Thread-173] (Suspended)
> waiting for: SpanningRecordSerializer<T> (id=533)
> owned by: Daemon Thread [Flat Map -> (Filter, Filter -> Flat
> Map -> Filter -> (Stream Sink, Stream Sink)) (6/8)] (Suspended)
> waiting for: ArrayDeque<E> (id=534)
> StreamRecordWriter<T>(RecordWriter<T>).flush() line: 149
> StreamRecordWriter$OutputFlusher.run() line: 90
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)