Hey, Piotrek,

Thanks for the response and thoughts!

I don't think it is possible to tell the difference between different cases
-- when someone would not want the results back from the instantiated
CollectSinkFunction.
It is not clear why this would be the case, unless it is some test flow
that explicitly does unusual things.

Regards,
-Alexey


On Wed, Feb 14, 2024 at 12:27 AM Piotr Nowojski <pnowoj...@apache.org>
wrote:

> Hi!
>
> Interesting catch. I think the current master branch behaviour is broken.
> The chance to lose some records on `endInput`
> is simply a critical bug. The limited buffer size is still problematic, as
> it can surprise users. Having said that, the newly
> proposed behaviour without any buffer is also problematic.
>
> Maybe whenever the user wants to collect the results, before or when
> calling `env.execute()` we should spawn a new
> thread that would asynchronously collect results from the
> `CollectSinkFunction`? I'm not sure but maybe hooking this
> logic up to whenever `CollectStreamSink` is being used is the way to go?
>
> One thing I'm not sure about is whether there are scenarios/existing code
> paths where someone wants to use the
> `CollectSinkFunction` but doesn't want the results to be read
> automatically? And if there are, can we tell them apart?
>
> Best,
> Piotrek
>
> pon., 12 lut 2024 o 08:48 Alexey Leonov-Vendrovskiy <vendrov...@gmail.com>
> napisał(a):
>
> > Hey all,
> >
> > We propose to slightly change the behavior of the CollectSinkFunction
> > <
> >
> https://github.com/apache/flink/blob/6f4d31f1b79afbde6c093b5d40ac83fe7524e303/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L129
> > >
> > and make it wait till all the result from the buffer is consumed by the
> > client, before shutting it down.
> >
> > Overall protocol and all the other behavior stay the same.
> >
> > This would be a way to guarantee result availability upon the job
> > completion. Today, the tail of the result is stored in an accumulator,
> and
> > gets stored in the job manager. There is an opportunity for this part of
> > the result to get lost, after the job is claimed to be
> > successfully "completed". Waiting till all the results are consumed while
> > the job is running is a natural way to achieve availability. Once the job
> > is done, we are certain all the results are consumed.
> >
> > This change would be achieved by overriding the endInput() method
> > in CollectSinkOperator, and passing the call to CollectSinkFunction to
> wait
> > till the buffer is empty.
> >
> > The old behavior could be enabled via a configuration flag (to be added).
> >
> > A notable side-effect of the change is that any invocation
> > of StreamExecutionEnvironment.execute() (synchronous execution) with a
> > pipeline with CollectSinkFunction in it, would effectively block waiting
> > for the results to get consumed. This would require running the consumer
> on
> > a different thread. Though note, it is* already the case* when the result
> > is larger that what can fit into the CollectSinkFunction's buffer.  Take
> a
> > look at flink-end-to-end-tests/test-scripts/test_quickstarts.sh in the
> > current state of the repo: if we change the parameter numRecords to be
> > 1,000,000, the test locks and waits forever. So, the only difference with
> > the change would be that in similar setups it would wait on any buffer
> size
> > > 0. It makes behavior consistent for results of any non-zero size.
> >
> >
> > Let me know your thoughts.
> >
> > Thanks,
> > Alexey
> >
>

Reply via email to