Hi Alex,

Thanks for creating the FLIP and opening up the discussion. +1 overall for
getting this in place.

One question: you've already mentioned that this focussed on the DataStream
API. I think it would be a bit confusing that we have a Datagen connector
(on the Table side) that wouldn't leverage this target interface. I think
it would be good if we could already have one generic Datagen connector
which works for both DataStream API (so that would be a new one in the
Flink repo) and that the Datagen in the Table landscape is using this
target interface too. What do you think?

Best regards,

Martijn

Op wo 8 jun. 2022 om 14:21 schreef Alexander Fedulov <
alexan...@ververica.com>:

> Hi Xianxun,
>
> Thanks for bringing it up. I do believe it would be useful to have such a
> CDC data generator but I see the
> efforts to provide one a bit orthogonal to the DataSourceGenerator proposed
> in the FLIP. FLIP-238 focuses
> on the DataStream API and I could see integration into the Table/SQL
> ecosystem as the next step that I would
> prefer to keep separate (see KafkaDynamicSource reusing
> KafkaSource<RowData>
> under the hood [1]).
>
> [1]
>
> https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223
>
> Best,
> Alexander Fedulov
>
>
>
>
> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <yxx_c...@163.com> wrote:
>
> > Hey Alexander,
> >
> > Making datagen source connector easier to use is really helpful during
> > doing some PoC/Demo.
> > And I thought about is it possible to produce a changelog stream by
> > datagen source, so a new flink developer can practice flink sql with cdc
> > data using Flink SQL Client CLI.
> > In the flink-examples-table module, a ChangelogSocketExample class[1]
> > describes how to ingest delete or insert data by 'nc' command. Can we
> > support producing a changelog stream by the new datagen source?
> >
> > [1]
> >
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79
> >
> > Best regards,
> >
> > Xianxun
> >
> > On 06/8/2022 08:10,Alexander Fedulov<alexan...@ververica.com>
> > <alexan...@ververica.com> wrote:
> >
> > I looked a bit further and it seems it should actually be easier than I
> > initially thought:  SourceReader extends CheckpointListener interface and
> > with its custom implementation it should be possible to achieve similar
> > results. A prototype that I have for the generator uses an
> > IteratorSourceReader
> > under the hood by default but we could consider adding the ability to
> > supply something like a DataGeneratorSourceReaderFactory that would allow
> > provisioning the DataGeneratorSource with customized implementations for
> > cases like this.
> >
> > Best,
> > Alexander Fedulov
> >
> > On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov <
> alexan...@ververica.com
> > >
> > wrote:
> >
> > Hi Steven,
> >
> > This is going to be tricky since in the new Source API the checkpointing
> > aspects that you based your logic on are pushed further away from the
> > low-level interfaces responsible for handling data and splits [1]. At the
> > same time, the SourceCoordinatorProvider is hardwired into the internals
> > of the framework, so I don't think it will be possible to provide a
> > customized implementation for testing purposes.
> >
> > The only chance to tie data generation to checkpointing in the new Source
> > API that I see at the moment is via the SplitEnumerator serializer (
> > getEnumeratorCheckpointSerializer() method) [2]. In theory, it should be
> > possible to share a variable visible both to the generator function and
> to
> > the serializer and manipulate it whenever the serialize() method gets
> > called upon a checkpoint request. That said, you still won't get
> > notifications of successful checkpoints that you currently use (this info
> > is only available to the SourceCoordinator).
> >
> > In general, regardless of the generator implementation itself, the new
> > Source
> > API does not seem to support the use case of verifying checkpoints
> > contents in lockstep with produced data, at least I do not see an
> immediate
> > solution for this. Can you think of a different way of checking the
> > correctness of the Iceberg Sink implementation that does not rely on this
> > approach?
> >
> > Best,
> > Alexander Fedulov
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337
> >
> > [2]
> >
> >
> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97
> >
> > On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> wrote:
> >
> > In Iceberg source, we have a data generator source that can control the
> > records per checkpoint cycle. Can we support sth like this in the
> > DataGeneratorSource?
> >
> >
> >
> >
> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
> > public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean
> > checkpointEnabled)
> >
> > Thanks,
> > Steven
> >
> > On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <
> alexan...@ververica.com
> >
> >
> > wrote:
> >
> > Hi everyone,
> >
> > I would like to open a discussion on FLIP-238: Introduce FLIP-27-based
> >
> > Data
> >
> > Generator Source [1]. During the discussion about deprecating the
> > SourceFunction API [2] it became evident that an easy-to-use
> > FLIP-27-compatible data generator source is needed so that the current
> > SourceFunction-based data generator implementations could be phased out
> >
> > for
> >
> > both Flink demo/PoC applications and for the internal Flink tests. This
> > FLIP proposes to introduce a generic DataGeneratorSource capable of
> > producing events of an arbitrary type based on a user-supplied
> >
> > MapFunction.
> >
> >
> > Looking forward to your feedback.
> >
> > [1] https://cwiki.apache.org/confluence/x/9Av1D
> > [2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9
> >
> > Best,
> > Alexander Fedulov
> >
> >
> >
> >
>

Reply via email to