Interesting. The only event I can picture is this SenderHintEvent. Maybe
there is a corner case.

Can you open a JIRA, I'll take a look.


On Thu, Jul 10, 2014 at 2:59 PM, Gyula Fóra <[email protected]> wrote:

> Hey,
>
> Until now, after every emit to the outputs we flushed them using the
> .flush() method of the recordwriter. Now we removed this flush() call and
> we have two interesting observations:
>
> First of all we dont send enough records the source finishes but the output
> buffer never gets flushed.
>
> Secondly if we generate a simple datastream from lets say the first 1500
> <
> https://github.com/stratosphere/stratosphere-streaming/blob/output-flush/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/PrintTest.java#L48
> >
> numbers we get an exception in the InputGates (after lets say a hundred
> records): java.lang.IllegalStateException: Channel received an event before
> completing the current partial record.
>
> java.lang.IllegalStateException: Channel received an event before
> completing the current partial record.
> at
>
> eu.stratosphere.runtime.io.channels.InputChannel.readRecord(InputChannel.java:177)
> at
> eu.stratosphere.runtime.io.gates.InputGate.readRecord(InputGate.java:173)
> at
>
> eu.stratosphere.streaming.api.streamcomponent.StreamRecordReader.hasNext(StreamRecordReader.java:96)
> at
>
> eu.stratosphere.streaming.api.streamcomponent.AbstractStreamComponent.invokeRecords(AbstractStreamComponent.java:255)
> at
>
> eu.stratosphere.streaming.api.streamcomponent.StreamSink.invoke(StreamSink.java:74)
> at
>
> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260)
> at java.lang.Thread.run(Unknown Source)
>
>
>
> This works perfectly if we flush the outputs after the emits.
>
> Any ideas what might cause this problem?
>
> Regards,
> Gyula
>

Reply via email to