Hi!

Interesting catch. I think the current master branch behaviour is broken.
The chance to lose some records on `endInput`
is simply a critical bug. The limited buffer size is still problematic, as
it can surprise users. Having said that, the newly
proposed behaviour without any buffer is also problematic.

Maybe whenever the user wants to collect the results, before or when
calling `env.execute()` we should spawn a new
thread that would asynchronously collect results from the
`CollectSinkFunction`? I'm not sure but maybe hooking this
logic up to whenever `CollectStreamSink` is being used is the way to go?

One thing I'm not sure about is whether there are scenarios/existing code
paths where someone wants to use the
`CollectSinkFunction` but doesn't want the results to be read
automatically? And if there are, can we tell them apart?

Best,
Piotrek

pon., 12 lut 2024 o 08:48 Alexey Leonov-Vendrovskiy <vendrov...@gmail.com>
napisaƂ(a):

> Hey all,
>
> We propose to slightly change the behavior of the CollectSinkFunction
> <
> https://github.com/apache/flink/blob/6f4d31f1b79afbde6c093b5d40ac83fe7524e303/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L129
> >
> and make it wait till all the result from the buffer is consumed by the
> client, before shutting it down.
>
> Overall protocol and all the other behavior stay the same.
>
> This would be a way to guarantee result availability upon the job
> completion. Today, the tail of the result is stored in an accumulator, and
> gets stored in the job manager. There is an opportunity for this part of
> the result to get lost, after the job is claimed to be
> successfully "completed". Waiting till all the results are consumed while
> the job is running is a natural way to achieve availability. Once the job
> is done, we are certain all the results are consumed.
>
> This change would be achieved by overriding the endInput() method
> in CollectSinkOperator, and passing the call to CollectSinkFunction to wait
> till the buffer is empty.
>
> The old behavior could be enabled via a configuration flag (to be added).
>
> A notable side-effect of the change is that any invocation
> of StreamExecutionEnvironment.execute() (synchronous execution) with a
> pipeline with CollectSinkFunction in it, would effectively block waiting
> for the results to get consumed. This would require running the consumer on
> a different thread. Though note, it is* already the case* when the result
> is larger that what can fit into the CollectSinkFunction's buffer.  Take a
> look at flink-end-to-end-tests/test-scripts/test_quickstarts.sh in the
> current state of the repo: if we change the parameter numRecords to be
> 1,000,000, the test locks and waits forever. So, the only difference with
> the change would be that in similar setups it would wait on any buffer size
> > 0. It makes behavior consistent for results of any non-zero size.
>
>
> Let me know your thoughts.
>
> Thanks,
> Alexey
>

Reply via email to