Hey Alexander, 

Making datagen source connector easier to use is really helpful during doing 
some PoC/Demo.
And I thought about is it possible to produce a changelog stream by datagen 
source, so a new flink developer can practice flink sql with cdc data using 
Flink SQL Client CLI.
In the flink-examples-table module, a ChangelogSocketExample class[1] describes 
how to ingest delete or insert data by 'nc' command. Can we support producing a 
changelog stream by the new datagen source?


[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79


Best regards,


Xianxun


On 06/8/2022 08:10,Alexander Fedulov<alexan...@ververica.com> wrote:
I looked a bit further and it seems it should actually be easier than I
initially thought:  SourceReader extends CheckpointListener interface and
with its custom implementation it should be possible to achieve similar
results. A prototype that I have for the generator uses an IteratorSourceReader
under the hood by default but we could consider adding the ability to
supply something like a DataGeneratorSourceReaderFactory that would allow
provisioning the DataGeneratorSource with customized implementations for
cases like this.

Best,
Alexander Fedulov

On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov <alexan...@ververica.com>
wrote:

Hi Steven,

This is going to be tricky since in the new Source API the checkpointing
aspects that you based your logic on are pushed further away from the
low-level interfaces responsible for handling data and splits [1]. At the
same time, the SourceCoordinatorProvider is hardwired into the internals
of the framework, so I don't think it will be possible to provide a
customized implementation for testing purposes.

The only chance to tie data generation to checkpointing in the new Source
API that I see at the moment is via the SplitEnumerator serializer (
getEnumeratorCheckpointSerializer() method) [2]. In theory, it should be
possible to share a variable visible both to the generator function and to
the serializer and manipulate it whenever the serialize() method gets
called upon a checkpoint request. That said, you still won't get
notifications of successful checkpoints that you currently use (this info
is only available to the SourceCoordinator).

In general, regardless of the generator implementation itself, the new Source
API does not seem to support the use case of verifying checkpoints
contents in lockstep with produced data, at least I do not see an immediate
solution for this. Can you think of a different way of checking the
correctness of the Iceberg Sink implementation that does not rely on this
approach?

Best,
Alexander Fedulov

[1]
https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337

[2]
https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97

On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> wrote:

In Iceberg source, we have a data generator source that can control the
records per checkpoint cycle. Can we support sth like this in the
DataGeneratorSource?


https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean
checkpointEnabled)

Thanks,
Steven

On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <alexan...@ververica.com

wrote:

Hi everyone,

I would like to open a discussion on FLIP-238: Introduce FLIP-27-based
Data
Generator Source [1]. During the discussion about deprecating the
SourceFunction API [2] it became evident that an easy-to-use
FLIP-27-compatible data generator source is needed so that the current
SourceFunction-based data generator implementations could be phased out
for
both Flink demo/PoC applications and for the internal Flink tests. This
FLIP proposes to introduce a generic DataGeneratorSource capable of
producing events of an arbitrary type based on a user-supplied
MapFunction.

Looking forward to your feedback.

[1] https://cwiki.apache.org/confluence/x/9Av1D
[2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9

Best,
Alexander Fedulov



Reply via email to