I'm very happy with this. +1 A lot of SourceFunction implementations used in demos/POC implementations include a call to sleep(), so adding rate limiting is a good idea, in my opinion.
Best, David On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren <renqs...@gmail.com> wrote: > Hi Alexander, > > Thanks for creating this FLIP! I’d like to share some thoughts. > > 1. About the “generatorFunction” I’m expecting an initializer on it > because it’s hard to require all fields in the generator function are > serializable in user’s implementation. Providing a function like “open” in > the interface could let the function to make some initializations in the > task initializing stage. > > 2. As of the throttling functinality you mentioned, there’s a > FlinkConnectorRateLimiter under flink-core and maybe we could reuse this > interface. Actually I prefer to make rate limiting as a common feature > provided in the Source API, but this requires another FLIP and a lot of > discussions so I’m OK to have it in the DataGen source first. > > Best regards, > Qingsheng > > > > On Jun 17, 2022, at 01:47, Alexander Fedulov <alexan...@ververica.com> > wrote: > > > > Hi Jing, > > > > thanks for your thorough analysis. I agree with the points you make and > > also with the idea to approach the larger task of providing a universal > > (DataStream + SQL) data generator base iteratively. > > Regarding the name, the SourceFunction-based *DataGeneratorSource* > resides > > in the *org.apache.flink.streaming.api.functions.source.datagen*. I think > > it is OK to simply place the new one (with the same name) next to the > > *NumberSequenceSource* into *org.apache.flink.api.connector.source.lib*. > > > > One more thing I wanted to discuss: I noticed that *DataGenTableSource > *has > > built-in throttling functionality (*rowsPerSecond*). I believe it is > > something that could be also useful for the DataStream users of the > > stateless data generator and since we want to eventually converge on the > > same implementation for DataStream and Table/SQL it sounds like a good > idea > > to add it to the FLIP. What do you think? > > > > Best, > > Alexander Fedulov > > > > > > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge <j...@ververica.com> wrote: > > > >> Hi, > >> > >> After reading all discussions posted in this thread and the source code > of > >> DataGeneratorSource which unfortunately used "Source" instead of > >> "SourceFunction" in its name, issues could summarized as following: > >> > >> 1. The current DataGeneratorSource based on SourceFunction is a stateful > >> source connector and built for Table/SQL. > >> 2. The right name for the new data generator source i.e. > >> DataGeneratorSource has been used for the current implementation based > on > >> SourceFunction. > >> 3. A new data generator source should be developed based on the new > Source > >> API. > >> 4. The new data generator source should be used both for DataStream and > >> Table/SQL, which means the current DataGeneratorSource should be > replaced > >> with the new one. > >> 5. The core event generation logic should be pluggable to support > various > >> (test) scenarios, e.g. rondom stream, changlog stream, controllable > events > >> per checkpoint, etc. > >> > >> which turns out that > >> > >> To solve 1+3+4 -> we will have to make a big effort to replace the > current > >> DataGeneratorSource since the new Source API has a very different > >> concept, especially for the stateful part. > >> To solve 2+3 -> we have to find another name for the new implementation. > >> To solve 1+3+4+5 -> It gets even more complicated to support stateless > and > >> stateful scenarios simultaneously with one solution. > >> > >> If we want to solve all of these issues in one shot, It might take > months. > >> Therefore, I would suggest starting from small and growing up > iteratively. > >> > >> The proposal for the kickoff is to focus on stateless event generation > >> with e.g. rondom stream and use the name "StatelessDataGeneratoSource". > >> The will be a period of time that both DataGeneratorSource will be used > by > >> the developer. The current DataGeneratorSource will be then deprecated, > >> once we can(iteratively): > >> 1. either enlarge the scope of StatelessDataGeneratoSourcer to be able > to > >> cover stateful scenarios and renaming it to > "DataGeneratorSourceV2"(follow > >> the naming convention of SinkV2) or > >> 2. develop a new "SatefullDataGeneratorSource" based on Source API which > >> can handle the stateful scenarios, if it is impossible to support both > >> stateless and stateful scenarios with one GeneratorSource > implementation. > >> > >> Best regards, > >> Jing > >> > >> On Thu, Jun 9, 2022 at 2:48 PM Martijn Visser <martijnvis...@apache.org > > > >> wrote: > >> > >>> Hey Alex, > >>> > >>> Yes, I think we need to make sure that we're not causing confusion (I > know > >>> I already was confused). I think the DataSupplierSource is already > better, > >>> but perhaps there are others who have an even better idea. > >>> > >>> Thanks, > >>> > >>> Martijn > >>> > >>> Op do 9 jun. 2022 om 14:28 schreef Alexander Fedulov < > >>> alexan...@ververica.com>: > >>> > >>>> Hi Martijn, > >>>> > >>>> It seems that they serve a bit different purposes though. The > >>>> DataGenTableSource is for generating random data described by the > Table > >>>> DDL and is tied into the RowDataGenerator/DataGenerator concept which > is > >>>> implemented as an Iterator<T>. The proposed API in contrast is > supposed > >>>> to provide users with an easy way to supply their custom data. Another > >>>> difference is that a DataGenerator is supposed to be stateful and has > to > >>>> snapshot its state, whereas the proposed API is purely driven by the > >>> input > >>>> index IDs and can be stateless yet remain deterministic. Are you sure > it > >>>> is a good idea to mix them into the same API? We could think of using > a > >>>> different name to make it less confusing for the users (something like > >>>> DataSupplierSource). > >>>> > >>>> Best, > >>>> Alexander Fedulov > >>>> > >>>> On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser < > martijnvis...@apache.org > >>>> > >>>> wrote: > >>>> > >>>>> Hi Alex, > >>>>> > >>>>> Thanks for creating the FLIP and opening up the discussion. +1 > overall > >>> for > >>>>> getting this in place. > >>>>> > >>>>> One question: you've already mentioned that this focussed on the > >>>>> DataStream > >>>>> API. I think it would be a bit confusing that we have a Datagen > >>> connector > >>>>> (on the Table side) that wouldn't leverage this target interface. I > >>> think > >>>>> it would be good if we could already have one generic Datagen > connector > >>>>> which works for both DataStream API (so that would be a new one in > the > >>>>> Flink repo) and that the Datagen in the Table landscape is using this > >>>>> target interface too. What do you think? > >>>>> > >>>>> Best regards, > >>>>> > >>>>> Martijn > >>>>> > >>>>> Op wo 8 jun. 2022 om 14:21 schreef Alexander Fedulov < > >>>>> alexan...@ververica.com>: > >>>>> > >>>>>> Hi Xianxun, > >>>>>> > >>>>>> Thanks for bringing it up. I do believe it would be useful to have > >>> such > >>>>> a > >>>>>> CDC data generator but I see the > >>>>>> efforts to provide one a bit orthogonal to the DataSourceGenerator > >>>>> proposed > >>>>>> in the FLIP. FLIP-238 focuses > >>>>>> on the DataStream API and I could see integration into the Table/SQL > >>>>>> ecosystem as the next step that I would > >>>>>> prefer to keep separate (see KafkaDynamicSource reusing > >>>>>> KafkaSource<RowData> > >>>>>> under the hood [1]). > >>>>>> > >>>>>> [1] > >>>>>> > >>>>>> > >>>>> > >>> > https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223 > >>>>>> > >>>>>> Best, > >>>>>> Alexander Fedulov > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <yxx_c...@163.com> > wrote: > >>>>>> > >>>>>>> Hey Alexander, > >>>>>>> > >>>>>>> Making datagen source connector easier to use is really helpful > >>> during > >>>>>>> doing some PoC/Demo. > >>>>>>> And I thought about is it possible to produce a changelog stream by > >>>>>>> datagen source, so a new flink developer can practice flink sql > >>> with > >>>>> cdc > >>>>>>> data using Flink SQL Client CLI. > >>>>>>> In the flink-examples-table module, a ChangelogSocketExample > >>> class[1] > >>>>>>> describes how to ingest delete or insert data by 'nc' command. Can > >>> we > >>>>>>> support producing a changelog stream by the new datagen source? > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>>> > >>>>> > >>> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79 > >>>>>>> > >>>>>>> Best regards, > >>>>>>> > >>>>>>> Xianxun > >>>>>>> > >>>>>>> On 06/8/2022 08:10,Alexander Fedulov<alexan...@ververica.com> > >>>>>>> <alexan...@ververica.com> wrote: > >>>>>>> > >>>>>>> I looked a bit further and it seems it should actually be easier > >>> than > >>>>> I > >>>>>>> initially thought: SourceReader extends CheckpointListener > >>> interface > >>>>> and > >>>>>>> with its custom implementation it should be possible to achieve > >>>>> similar > >>>>>>> results. A prototype that I have for the generator uses an > >>>>>>> IteratorSourceReader > >>>>>>> under the hood by default but we could consider adding the ability > >>> to > >>>>>>> supply something like a DataGeneratorSourceReaderFactory that would > >>>>> allow > >>>>>>> provisioning the DataGeneratorSource with customized > >>> implementations > >>>>> for > >>>>>>> cases like this. > >>>>>>> > >>>>>>> Best, > >>>>>>> Alexander Fedulov > >>>>>>> > >>>>>>> On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov < > >>>>>> alexan...@ververica.com > >>>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi Steven, > >>>>>>> > >>>>>>> This is going to be tricky since in the new Source API the > >>>>> checkpointing > >>>>>>> aspects that you based your logic on are pushed further away from > >>> the > >>>>>>> low-level interfaces responsible for handling data and splits [1]. > >>> At > >>>>> the > >>>>>>> same time, the SourceCoordinatorProvider is hardwired into the > >>>>> internals > >>>>>>> of the framework, so I don't think it will be possible to provide a > >>>>>>> customized implementation for testing purposes. > >>>>>>> > >>>>>>> The only chance to tie data generation to checkpointing in the new > >>>>> Source > >>>>>>> API that I see at the moment is via the SplitEnumerator serializer > >>> ( > >>>>>>> getEnumeratorCheckpointSerializer() method) [2]. In theory, it > >>> should > >>>>> be > >>>>>>> possible to share a variable visible both to the generator function > >>>>> and > >>>>>> to > >>>>>>> the serializer and manipulate it whenever the serialize() method > >>> gets > >>>>>>> called upon a checkpoint request. That said, you still won't get > >>>>>>> notifications of successful checkpoints that you currently use > >>> (this > >>>>> info > >>>>>>> is only available to the SourceCoordinator). > >>>>>>> > >>>>>>> In general, regardless of the generator implementation itself, the > >>> new > >>>>>>> Source > >>>>>>> API does not seem to support the use case of verifying checkpoints > >>>>>>> contents in lockstep with produced data, at least I do not see an > >>>>>> immediate > >>>>>>> solution for this. Can you think of a different way of checking the > >>>>>>> correctness of the Iceberg Sink implementation that does not rely > >>> on > >>>>> this > >>>>>>> approach? > >>>>>>> > >>>>>>> Best, > >>>>>>> Alexander Fedulov > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 > >>>>>>> > >>>>>>> [2] > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 > >>>>>>> > >>>>>>> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> > >>>>> wrote: > >>>>>>> > >>>>>>> In Iceberg source, we have a data generator source that can control > >>>>> the > >>>>>>> records per checkpoint cycle. Can we support sth like this in the > >>>>>>> DataGeneratorSource? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java > >>>>>>> public BoundedTestSource(List<List<T>> elementsPerCheckpoint, > >>> boolean > >>>>>>> checkpointEnabled) > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Steven > >>>>>>> > >>>>>>> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov < > >>>>>> alexan...@ververica.com > >>>>>>> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi everyone, > >>>>>>> > >>>>>>> I would like to open a discussion on FLIP-238: Introduce > >>> FLIP-27-based > >>>>>>> > >>>>>>> Data > >>>>>>> > >>>>>>> Generator Source [1]. During the discussion about deprecating the > >>>>>>> SourceFunction API [2] it became evident that an easy-to-use > >>>>>>> FLIP-27-compatible data generator source is needed so that the > >>> current > >>>>>>> SourceFunction-based data generator implementations could be phased > >>>>> out > >>>>>>> > >>>>>>> for > >>>>>>> > >>>>>>> both Flink demo/PoC applications and for the internal Flink tests. > >>>>> This > >>>>>>> FLIP proposes to introduce a generic DataGeneratorSource capable of > >>>>>>> producing events of an arbitrary type based on a user-supplied > >>>>>>> > >>>>>>> MapFunction. > >>>>>>> > >>>>>>> > >>>>>>> Looking forward to your feedback. > >>>>>>> > >>>>>>> [1] https://cwiki.apache.org/confluence/x/9Av1D > >>>>>>> [2] > >>> https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 > >>>>>>> > >>>>>>> Best, > >>>>>>> Alexander Fedulov > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >