I'm very happy with this. +1

A lot of SourceFunction implementations used in demos/POC implementations
include a call to sleep(), so adding rate limiting is a good idea, in my
opinion.

Best,
David

On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren <renqs...@gmail.com> wrote:

> Hi Alexander,
>
> Thanks for creating this FLIP! I’d like to share some thoughts.
>
> 1. About the “generatorFunction” I’m expecting an initializer on it
> because it’s hard to require all fields in the generator function are
> serializable in user’s implementation. Providing a function like “open” in
> the interface could let the function to make some initializations in the
> task initializing stage.
>
> 2. As of the throttling functinality you mentioned, there’s a
> FlinkConnectorRateLimiter under flink-core and maybe we could reuse this
> interface. Actually I prefer to make rate limiting as a common feature
> provided in the Source API, but this requires another FLIP and a lot of
> discussions so I’m OK to have it in the DataGen source first.
>
> Best regards,
> Qingsheng
>
>
> > On Jun 17, 2022, at 01:47, Alexander Fedulov <alexan...@ververica.com>
> wrote:
> >
> > Hi Jing,
> >
> > thanks for your thorough analysis. I agree with the points you make and
> > also with the idea to approach the larger task of providing a universal
> > (DataStream + SQL) data generator base iteratively.
> > Regarding the name, the SourceFunction-based *DataGeneratorSource*
> resides
> > in the *org.apache.flink.streaming.api.functions.source.datagen*. I think
> > it is OK to simply place the new one (with the same name) next to the
> > *NumberSequenceSource* into *org.apache.flink.api.connector.source.lib*.
> >
> > One more thing I wanted to discuss:  I noticed that *DataGenTableSource
> *has
> > built-in throttling functionality (*rowsPerSecond*). I believe it is
> > something that could be also useful for the DataStream users of the
> > stateless data generator and since we want to eventually converge on the
> > same implementation for DataStream and Table/SQL it sounds like a good
> idea
> > to add it to the FLIP. What do you think?
> >
> > Best,
> > Alexander Fedulov
> >
> >
> > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge <j...@ververica.com> wrote:
> >
> >> Hi,
> >>
> >> After reading all discussions posted in this thread and the source code
> of
> >> DataGeneratorSource which unfortunately used "Source" instead of
> >> "SourceFunction" in its name, issues could summarized as following:
> >>
> >> 1. The current DataGeneratorSource based on SourceFunction is a stateful
> >> source connector and built for Table/SQL.
> >> 2. The right name for the new data generator source i.e.
> >> DataGeneratorSource has been used for the current implementation based
> on
> >> SourceFunction.
> >> 3. A new data generator source should be developed based on the new
> Source
> >> API.
> >> 4. The new data generator source should be used both for DataStream and
> >> Table/SQL, which means the current DataGeneratorSource should be
> replaced
> >> with the new one.
> >> 5. The core event generation logic should be pluggable to support
> various
> >> (test) scenarios, e.g. rondom stream, changlog stream, controllable
> events
> >> per checkpoint, etc.
> >>
> >> which turns out that
> >>
> >> To solve 1+3+4 -> we will have to make a big effort to replace the
> current
> >> DataGeneratorSource since the new Source API has a very different
> >> concept, especially for the stateful part.
> >> To solve 2+3 -> we have to find another name for the new implementation.
> >> To solve 1+3+4+5 -> It gets even more complicated to support stateless
> and
> >> stateful scenarios simultaneously with one solution.
> >>
> >> If we want to solve all of these issues in one shot, It might take
> months.
> >> Therefore, I would suggest starting from small and growing up
> iteratively.
> >>
> >> The proposal for the kickoff is to focus on stateless event generation
> >> with e.g. rondom stream and use the name "StatelessDataGeneratoSource".
> >> The will be a period of time that both DataGeneratorSource will be used
> by
> >> the developer. The current DataGeneratorSource will be then deprecated,
> >> once we can(iteratively):
> >> 1. either enlarge the scope of StatelessDataGeneratoSourcer to be able
> to
> >> cover stateful scenarios and renaming it to
> "DataGeneratorSourceV2"(follow
> >> the naming convention of SinkV2) or
> >> 2. develop a new "SatefullDataGeneratorSource" based on Source API which
> >> can handle the stateful scenarios, if it is impossible to support both
> >> stateless and stateful scenarios with one GeneratorSource
> implementation.
> >>
> >> Best regards,
> >> Jing
> >>
> >> On Thu, Jun 9, 2022 at 2:48 PM Martijn Visser <martijnvis...@apache.org
> >
> >> wrote:
> >>
> >>> Hey Alex,
> >>>
> >>> Yes, I think we need to make sure that we're not causing confusion (I
> know
> >>> I already was confused). I think the DataSupplierSource is already
> better,
> >>> but perhaps there are others who have an even better idea.
> >>>
> >>> Thanks,
> >>>
> >>> Martijn
> >>>
> >>> Op do 9 jun. 2022 om 14:28 schreef Alexander Fedulov <
> >>> alexan...@ververica.com>:
> >>>
> >>>> Hi Martijn,
> >>>>
> >>>> It seems that they serve a bit different purposes though. The
> >>>> DataGenTableSource is for generating random data described by the
> Table
> >>>> DDL and is tied into the RowDataGenerator/DataGenerator concept which
> is
> >>>> implemented as an Iterator<T>.  The proposed API in contrast is
> supposed
> >>>> to provide users with an easy way to supply their custom data. Another
> >>>> difference is that a DataGenerator is supposed to be stateful and has
> to
> >>>> snapshot its state, whereas the proposed API is purely driven by the
> >>> input
> >>>> index IDs and can be stateless yet remain deterministic. Are you sure
> it
> >>>> is a good idea to mix them into the same API? We could think of using
> a
> >>>> different name to make it less confusing for the users (something like
> >>>> DataSupplierSource).
> >>>>
> >>>> Best,
> >>>> Alexander Fedulov
> >>>>
> >>>> On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser <
> martijnvis...@apache.org
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Hi Alex,
> >>>>>
> >>>>> Thanks for creating the FLIP and opening up the discussion. +1
> overall
> >>> for
> >>>>> getting this in place.
> >>>>>
> >>>>> One question: you've already mentioned that this focussed on the
> >>>>> DataStream
> >>>>> API. I think it would be a bit confusing that we have a Datagen
> >>> connector
> >>>>> (on the Table side) that wouldn't leverage this target interface. I
> >>> think
> >>>>> it would be good if we could already have one generic Datagen
> connector
> >>>>> which works for both DataStream API (so that would be a new one in
> the
> >>>>> Flink repo) and that the Datagen in the Table landscape is using this
> >>>>> target interface too. What do you think?
> >>>>>
> >>>>> Best regards,
> >>>>>
> >>>>> Martijn
> >>>>>
> >>>>> Op wo 8 jun. 2022 om 14:21 schreef Alexander Fedulov <
> >>>>> alexan...@ververica.com>:
> >>>>>
> >>>>>> Hi Xianxun,
> >>>>>>
> >>>>>> Thanks for bringing it up. I do believe it would be useful to have
> >>> such
> >>>>> a
> >>>>>> CDC data generator but I see the
> >>>>>> efforts to provide one a bit orthogonal to the DataSourceGenerator
> >>>>> proposed
> >>>>>> in the FLIP. FLIP-238 focuses
> >>>>>> on the DataStream API and I could see integration into the Table/SQL
> >>>>>> ecosystem as the next step that I would
> >>>>>> prefer to keep separate (see KafkaDynamicSource reusing
> >>>>>> KafkaSource<RowData>
> >>>>>> under the hood [1]).
> >>>>>>
> >>>>>> [1]
> >>>>>>
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223
> >>>>>>
> >>>>>> Best,
> >>>>>> Alexander Fedulov
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <yxx_c...@163.com>
> wrote:
> >>>>>>
> >>>>>>> Hey Alexander,
> >>>>>>>
> >>>>>>> Making datagen source connector easier to use is really helpful
> >>> during
> >>>>>>> doing some PoC/Demo.
> >>>>>>> And I thought about is it possible to produce a changelog stream by
> >>>>>>> datagen source, so a new flink developer can practice flink sql
> >>> with
> >>>>> cdc
> >>>>>>> data using Flink SQL Client CLI.
> >>>>>>> In the flink-examples-table module, a ChangelogSocketExample
> >>> class[1]
> >>>>>>> describes how to ingest delete or insert data by 'nc' command. Can
> >>> we
> >>>>>>> support producing a changelog stream by the new datagen source?
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79
> >>>>>>>
> >>>>>>> Best regards,
> >>>>>>>
> >>>>>>> Xianxun
> >>>>>>>
> >>>>>>> On 06/8/2022 08:10,Alexander Fedulov<alexan...@ververica.com>
> >>>>>>> <alexan...@ververica.com> wrote:
> >>>>>>>
> >>>>>>> I looked a bit further and it seems it should actually be easier
> >>> than
> >>>>> I
> >>>>>>> initially thought:  SourceReader extends CheckpointListener
> >>> interface
> >>>>> and
> >>>>>>> with its custom implementation it should be possible to achieve
> >>>>> similar
> >>>>>>> results. A prototype that I have for the generator uses an
> >>>>>>> IteratorSourceReader
> >>>>>>> under the hood by default but we could consider adding the ability
> >>> to
> >>>>>>> supply something like a DataGeneratorSourceReaderFactory that would
> >>>>> allow
> >>>>>>> provisioning the DataGeneratorSource with customized
> >>> implementations
> >>>>> for
> >>>>>>> cases like this.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Alexander Fedulov
> >>>>>>>
> >>>>>>> On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov <
> >>>>>> alexan...@ververica.com
> >>>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi Steven,
> >>>>>>>
> >>>>>>> This is going to be tricky since in the new Source API the
> >>>>> checkpointing
> >>>>>>> aspects that you based your logic on are pushed further away from
> >>> the
> >>>>>>> low-level interfaces responsible for handling data and splits [1].
> >>> At
> >>>>> the
> >>>>>>> same time, the SourceCoordinatorProvider is hardwired into the
> >>>>> internals
> >>>>>>> of the framework, so I don't think it will be possible to provide a
> >>>>>>> customized implementation for testing purposes.
> >>>>>>>
> >>>>>>> The only chance to tie data generation to checkpointing in the new
> >>>>> Source
> >>>>>>> API that I see at the moment is via the SplitEnumerator serializer
> >>> (
> >>>>>>> getEnumeratorCheckpointSerializer() method) [2]. In theory, it
> >>> should
> >>>>> be
> >>>>>>> possible to share a variable visible both to the generator function
> >>>>> and
> >>>>>> to
> >>>>>>> the serializer and manipulate it whenever the serialize() method
> >>> gets
> >>>>>>> called upon a checkpoint request. That said, you still won't get
> >>>>>>> notifications of successful checkpoints that you currently use
> >>> (this
> >>>>> info
> >>>>>>> is only available to the SourceCoordinator).
> >>>>>>>
> >>>>>>> In general, regardless of the generator implementation itself, the
> >>> new
> >>>>>>> Source
> >>>>>>> API does not seem to support the use case of verifying checkpoints
> >>>>>>> contents in lockstep with produced data, at least I do not see an
> >>>>>> immediate
> >>>>>>> solution for this. Can you think of a different way of checking the
> >>>>>>> correctness of the Iceberg Sink implementation that does not rely
> >>> on
> >>>>> this
> >>>>>>> approach?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Alexander Fedulov
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337
> >>>>>>>
> >>>>>>> [2]
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97
> >>>>>>>
> >>>>>>> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>> In Iceberg source, we have a data generator source that can control
> >>>>> the
> >>>>>>> records per checkpoint cycle. Can we support sth like this in the
> >>>>>>> DataGeneratorSource?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
> >>>>>>> public BoundedTestSource(List<List<T>> elementsPerCheckpoint,
> >>> boolean
> >>>>>>> checkpointEnabled)
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Steven
> >>>>>>>
> >>>>>>> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <
> >>>>>> alexan...@ververica.com
> >>>>>>>
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi everyone,
> >>>>>>>
> >>>>>>> I would like to open a discussion on FLIP-238: Introduce
> >>> FLIP-27-based
> >>>>>>>
> >>>>>>> Data
> >>>>>>>
> >>>>>>> Generator Source [1]. During the discussion about deprecating the
> >>>>>>> SourceFunction API [2] it became evident that an easy-to-use
> >>>>>>> FLIP-27-compatible data generator source is needed so that the
> >>> current
> >>>>>>> SourceFunction-based data generator implementations could be phased
> >>>>> out
> >>>>>>>
> >>>>>>> for
> >>>>>>>
> >>>>>>> both Flink demo/PoC applications and for the internal Flink tests.
> >>>>> This
> >>>>>>> FLIP proposes to introduce a generic DataGeneratorSource capable of
> >>>>>>> producing events of an arbitrary type based on a user-supplied
> >>>>>>>
> >>>>>>> MapFunction.
> >>>>>>>
> >>>>>>>
> >>>>>>> Looking forward to your feedback.
> >>>>>>>
> >>>>>>> [1] https://cwiki.apache.org/confluence/x/9Av1D
> >>>>>>> [2]
> >>> https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Alexander Fedulov
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to