Hi all, I updated the FLIP [1] to make it more extensible with the introduction of *SourceReaderFactory. *It gives users the ability to further customize the data generation and emission process if needed. I also incorporated the suggestion from Qingsheng and moved to the generator function design with an initializer method to support more sophisticated functions with non-serializable fields. I am personally pretty happy with the current prototype [2], [3]. Let me know if you have any other feedback, otherwise, I am going to start the vote.
[1] https://cwiki.apache.org/confluence/x/9Av1D [2] https://github.com/afedulov/flink/blob/e5f8e555543a67b9983c42b5db2a7617361fa459/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV4.java#L52 [3] https://github.com/afedulov/flink/blob/e5f8e555543a67b9983c42b5db2a7617361fa459/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourcePOC.java#L92 Best, Alexander Fedulov On Thu, Jul 7, 2022 at 12:08 AM Alexander Fedulov <alexan...@ververica.com> wrote: > Hi Becket, > > interesting points about the discrepancies in the *RuntimeContext* > "wrapping" throughout the framework, but I agree - this is something that > needs to be tackled separately. > For now, I adjusted the FLIP and the PoC implementation to only expose the > parallelism. > > Best, > Alexander Fedulov > > On Wed, Jul 6, 2022 at 2:42 AM Becket Qin <becket....@gmail.com> wrote: > >> Hi Alex, >> >> Personally I prefer the latter option, i.e. just add the >> currentParallelism() method. It is easy to add more stuff to the >> SourceReaderContext in the future, and it is likely that most of the stuff >> in the RuntimeContext is not required by the SourceReader implementations. >> For the purpose of this FLIP, adding the method is probably good enough. >> >> That said, I don't see a consistent pattern adopted in the project to >> handle similar cases. The FunctionContext wraps the RuntimeContext and >> only >> exposes necessary stuff. CEPRuntimeContext extends the RuntimeContext and >> overrides some methods that it does not want to expose with exception >> throwing logic. Some internal context classes simply expose the entire >> RuntimeContext with some additional methods. If we want to make things >> clean, I'd imagine all these variations of context can become some >> specific >> combination of a ReadOnlyRuntimeContext and some "write" methods. But this >> may require a closer look at all these cases to make sure the >> ReadOnlyRuntimeContext is generally suitable. I feel that it will take >> some >> time and could be a bigger discussion than the data generator source >> itself. So maybe we can just go with adding a method at the moment. And >> evolving the SourceReaderContext to use the ReadOnlyRuntimeContext in the >> future. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Tue, Jul 5, 2022 at 8:31 PM Alexander Fedulov <alexan...@ververica.com >> > >> wrote: >> >> > Hi Becket, >> > >> > I agree with you. We could introduce a *ReadOnlyRuntimeContext* that >> would >> > act as a holder for the *RuntimeContext* data. This would also require >> > read-only wrappers for the exposed fields, such as *ExecutionConfig*. >> > Alternatively, we just add the *currentParallelism()* method for now and >> > see if anything else might actually be needed later on. What do you >> think? >> > >> > Best, >> > Alexander Fedulov >> > >> > On Tue, Jul 5, 2022 at 2:30 AM Becket Qin <becket....@gmail.com> wrote: >> > >> > > Hi Alex, >> > > >> > > While it is true that the RuntimeContext gives access to all the stuff >> > the >> > > framework can provide, it seems a little overkilling for the >> > SourceReader. >> > > It is probably OK to expose all the read-only information in the >> > > RuntimeContext to the SourceReader, but we may want to hide the >> "write" >> > > methods, such as creating states, writing stuff to distributed cache, >> > etc, >> > > because these methods may not work well with the SourceReader design >> and >> > > cause confusion. For example, users may wonder why the snapshotState() >> > > method exists while they can use the state directly. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > >> > > >> > > On Tue, Jul 5, 2022 at 7:37 AM Alexander Fedulov < >> > alexan...@ververica.com> >> > > wrote: >> > > >> > > > Hi Becket, >> > > > >> > > > I updated and extended FLIP-238 accordingly. >> > > > >> > > > Here is also my POC branch [1]. >> > > > DataGeneratorSourceV3 is the class that I currently converged on >> [2]. >> > It >> > > is >> > > > based on the expanded SourceReaderContext. >> > > > A couple more relevant classes [3] [4] >> > > > >> > > > Would appreciate it if you could take a quick look. >> > > > >> > > > [1] >> > https://github.com/afedulov/flink/tree/FLINK-27919-generator-source >> > > > [2] >> > > > >> > > > >> > > >> > >> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java >> > > > [3] >> > > > >> > > > >> > > >> > >> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java >> > > > [4] >> > > > >> > > > >> > > >> > >> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java >> > > > >> > > > Best, >> > > > Alexander Fedulov >> > > > >> > > > On Mon, Jul 4, 2022 at 12:08 PM Alexander Fedulov < >> > > alexan...@ververica.com >> > > > > >> > > > wrote: >> > > > >> > > > > Hi Becket, >> > > > > >> > > > > Exposing the RuntimeContext is potentially even more useful. >> > > > > Do you think it is worth having both currentParallelism() and >> > > > > getRuntimeContext() methods? >> > > > > One can always call getNumberOfParallelSubtasks() on the >> > RuntimeContext >> > > > > directly if we expose it. >> > > > > >> > > > > Best, >> > > > > Alexander Fedulov >> > > > > >> > > > > >> > > > > On Mon, Jul 4, 2022 at 3:44 AM Becket Qin <becket....@gmail.com> >> > > wrote: >> > > > > >> > > > >> Hi Alex, >> > > > >> >> > > > >> Yes, that is what I had in mind. We need to add the method >> > > > >> getRuntimeContext() to the SourceReaderContext interface as well. >> > > > >> >> > > > >> Thanks, >> > > > >> >> > > > >> Jiangjie (Becket) Qin >> > > > >> >> > > > >> On Mon, Jul 4, 2022 at 3:01 AM Alexander Fedulov < >> > > > alexan...@ververica.com >> > > > >> > >> > > > >> wrote: >> > > > >> >> > > > >> > Hi Becket, >> > > > >> > >> > > > >> > thanks for your input. I like the idea of adding the >> parallelism >> > to >> > > > the >> > > > >> > SourceReaderContext. My understanding is that any change of >> > > > parallelism >> > > > >> > causes recreation of all readers, so it should be safe to >> consider >> > > it >> > > > >> > "fixed" after the readers' initialization. In that case, it >> should >> > > be >> > > > as >> > > > >> > simple as adding the following to the anonymous >> > SourceReaderContext >> > > > >> > implementation >> > > > >> > in SourceOperator#initReader(): >> > > > >> > >> > > > >> > public int currentParallelism() { >> > > > >> > return getRuntimeContext().getNumberOfParallelSubtasks(); >> > > > >> > } >> > > > >> > >> > > > >> > Is that what you had in mind? >> > > > >> > >> > > > >> > Best, >> > > > >> > Alexander Fedulov >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > On Fri, Jul 1, 2022 at 11:30 AM Becket Qin < >> becket....@gmail.com> >> > > > >> wrote: >> > > > >> > >> > > > >> > > Hi Alex, >> > > > >> > > >> > > > >> > > In FLIP-27 source, the SourceReader can get a >> > SourceReaderContext. >> > > > >> This >> > > > >> > is >> > > > >> > > passed in by the TM in Source#createReader(). And supposedly >> the >> > > > >> Source >> > > > >> > > should pass this to the SourceReader if needed. >> > > > >> > > >> > > > >> > > In the SourceReaderContext, currently only the index of the >> > > current >> > > > >> > subtask >> > > > >> > > is available, but we can probably add the current >> parallelism as >> > > > well. >> > > > >> > This >> > > > >> > > would be a change that affects all the Sources, not only for >> the >> > > > data >> > > > >> > > generator source. Perhaps we can have a simple separate FLIP. >> > > > >> > > >> > > > >> > > Regarding the semantic of rate limiting, for the rate limit >> > > source, >> > > > >> > > personally I feel intuitive to keep the global rate >> untouched on >> > > > >> scaling. >> > > > >> > > >> > > > >> > > Thanks, >> > > > >> > > >> > > > >> > > Jiangjie (Becket) Qin >> > > > >> > > >> > > > >> > > On Fri, Jul 1, 2022 at 4:00 AM Alexander Fedulov < >> > > > >> > alexan...@ververica.com> >> > > > >> > > wrote: >> > > > >> > > >> > > > >> > > > Hi all, >> > > > >> > > > >> > > > >> > > > getting back to the idea of reusing >> FlinkConnectorRateLimiter: >> > > it >> > > > is >> > > > >> > > > designed for the SourceFunction API and has an open() >> method >> > > that >> > > > >> > takes a >> > > > >> > > > RuntimeContext. Therefore, we need to add a different >> > interface >> > > > for >> > > > >> > > > the new Source >> > > > >> > > > API. >> > > > >> > > > >> > > > >> > > > This is where I see a certain limitation for the >> rate-limiting >> > > use >> > > > >> > case: >> > > > >> > > in >> > > > >> > > > the old API the individual readers were able to retrieve >> the >> > > > current >> > > > >> > > > parallelism from the RuntimeContext. In the new API, this >> is >> > not >> > > > >> > > supported, >> > > > >> > > > the information about the parallelism is only available in >> the >> > > > >> > > > SplitEnumeratorContext to which the readers do not have >> > access. >> > > > >> > > > >> > > > >> > > > I see two possibilities: >> > > > >> > > > 1. Add an optional RateLimiter parameter to the >> > > > DataGeneratorSource >> > > > >> > > > constructor. The RateLimiter is then "fixed" and has to be >> > fully >> > > > >> > > configured >> > > > >> > > > by the user in the main method. >> > > > >> > > > 2. Piggy-back on Splits: add parallelism as a field of a >> > Split. >> > > > The >> > > > >> > > > initialization of this field would happen dynamically upon >> > > splits >> > > > >> > > creation >> > > > >> > > > in the createEnumerator() method where currentParallelism >> is >> > > > >> available. >> > > > >> > > > >> > > > >> > > > The second approach makes implementation rather >> significantly >> > > more >> > > > >> > > > complex since we cannot simply wrap >> > > > >> > NumberSequenceSource.SplitSerializer >> > > > >> > > in >> > > > >> > > > that case. The advantage of this approach is that with any >> > kind >> > > of >> > > > >> > > > autoscaling, the source rate will match the original >> > > > configuration. >> > > > >> But >> > > > >> > > I'm >> > > > >> > > > not sure how useful this is. I can even imagine scenarios >> > where >> > > > >> scaling >> > > > >> > > the >> > > > >> > > > input rate together with parallelism would be better for >> demo >> > > > >> purposes. >> > > > >> > > > >> > > > >> > > > Would be glad to hear your thoughts on this. >> > > > >> > > > >> > > > >> > > > Best, >> > > > >> > > > Alexander Fedulov >> > > > >> > > > >> > > > >> > > > On Mon, Jun 20, 2022 at 4:31 PM David Anderson < >> > > > >> dander...@apache.org> >> > > > >> > > > wrote: >> > > > >> > > > >> > > > >> > > > > I'm very happy with this. +1 >> > > > >> > > > > >> > > > >> > > > > A lot of SourceFunction implementations used in demos/POC >> > > > >> > > implementations >> > > > >> > > > > include a call to sleep(), so adding rate limiting is a >> good >> > > > >> idea, in >> > > > >> > > my >> > > > >> > > > > opinion. >> > > > >> > > > > >> > > > >> > > > > Best, >> > > > >> > > > > David >> > > > >> > > > > >> > > > >> > > > > On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren < >> > > > >> renqs...@gmail.com> >> > > > >> > > > wrote: >> > > > >> > > > > >> > > > >> > > > > > Hi Alexander, >> > > > >> > > > > > >> > > > >> > > > > > Thanks for creating this FLIP! I’d like to share some >> > > > thoughts. >> > > > >> > > > > > >> > > > >> > > > > > 1. About the “generatorFunction” I’m expecting an >> > > initializer >> > > > >> on it >> > > > >> > > > > > because it’s hard to require all fields in the >> generator >> > > > >> function >> > > > >> > are >> > > > >> > > > > > serializable in user’s implementation. Providing a >> > function >> > > > like >> > > > >> > > “open” >> > > > >> > > > > in >> > > > >> > > > > > the interface could let the function to make some >> > > > >> initializations >> > > > >> > in >> > > > >> > > > the >> > > > >> > > > > > task initializing stage. >> > > > >> > > > > > >> > > > >> > > > > > 2. As of the throttling functinality you mentioned, >> > there’s >> > > a >> > > > >> > > > > > FlinkConnectorRateLimiter under flink-core and maybe we >> > > could >> > > > >> reuse >> > > > >> > > > this >> > > > >> > > > > > interface. Actually I prefer to make rate limiting as a >> > > common >> > > > >> > > feature >> > > > >> > > > > > provided in the Source API, but this requires another >> FLIP >> > > > and a >> > > > >> > lot >> > > > >> > > of >> > > > >> > > > > > discussions so I’m OK to have it in the DataGen source >> > > first. >> > > > >> > > > > > >> > > > >> > > > > > Best regards, >> > > > >> > > > > > Qingsheng >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > > On Jun 17, 2022, at 01:47, Alexander Fedulov < >> > > > >> > > > alexan...@ververica.com> >> > > > >> > > > > > wrote: >> > > > >> > > > > > > >> > > > >> > > > > > > Hi Jing, >> > > > >> > > > > > > >> > > > >> > > > > > > thanks for your thorough analysis. I agree with the >> > points >> > > > you >> > > > >> > make >> > > > >> > > > and >> > > > >> > > > > > > also with the idea to approach the larger task of >> > > providing >> > > > a >> > > > >> > > > universal >> > > > >> > > > > > > (DataStream + SQL) data generator base iteratively. >> > > > >> > > > > > > Regarding the name, the SourceFunction-based >> > > > >> > *DataGeneratorSource* >> > > > >> > > > > > resides >> > > > >> > > > > > > in the >> > > > >> > *org.apache.flink.streaming.api.functions.source.datagen*. I >> > > > >> > > > > think >> > > > >> > > > > > > it is OK to simply place the new one (with the same >> > name) >> > > > >> next to >> > > > >> > > the >> > > > >> > > > > > > *NumberSequenceSource* into >> > > > >> > > > > *org.apache.flink.api.connector.source.lib*. >> > > > >> > > > > > > >> > > > >> > > > > > > One more thing I wanted to discuss: I noticed that >> > > > >> > > > *DataGenTableSource >> > > > >> > > > > > *has >> > > > >> > > > > > > built-in throttling functionality (*rowsPerSecond*). >> I >> > > > >> believe it >> > > > >> > > is >> > > > >> > > > > > > something that could be also useful for the >> DataStream >> > > users >> > > > >> of >> > > > >> > the >> > > > >> > > > > > > stateless data generator and since we want to >> eventually >> > > > >> converge >> > > > >> > > on >> > > > >> > > > > the >> > > > >> > > > > > > same implementation for DataStream and Table/SQL it >> > sounds >> > > > >> like a >> > > > >> > > > good >> > > > >> > > > > > idea >> > > > >> > > > > > > to add it to the FLIP. What do you think? >> > > > >> > > > > > > >> > > > >> > > > > > > Best, >> > > > >> > > > > > > Alexander Fedulov >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge < >> > > j...@ververica.com >> > > > > >> > > > >> > > wrote: >> > > > >> > > > > > > >> > > > >> > > > > > >> Hi, >> > > > >> > > > > > >> >> > > > >> > > > > > >> After reading all discussions posted in this thread >> and >> > > the >> > > > >> > source >> > > > >> > > > > code >> > > > >> > > > > > of >> > > > >> > > > > > >> DataGeneratorSource which unfortunately used >> "Source" >> > > > >> instead of >> > > > >> > > > > > >> "SourceFunction" in its name, issues could >> summarized >> > as >> > > > >> > > following: >> > > > >> > > > > > >> >> > > > >> > > > > > >> 1. The current DataGeneratorSource based on >> > > SourceFunction >> > > > >> is a >> > > > >> > > > > stateful >> > > > >> > > > > > >> source connector and built for Table/SQL. >> > > > >> > > > > > >> 2. The right name for the new data generator source >> > i.e. >> > > > >> > > > > > >> DataGeneratorSource has been used for the current >> > > > >> implementation >> > > > >> > > > based >> > > > >> > > > > > on >> > > > >> > > > > > >> SourceFunction. >> > > > >> > > > > > >> 3. A new data generator source should be developed >> > based >> > > on >> > > > >> the >> > > > >> > > new >> > > > >> > > > > > Source >> > > > >> > > > > > >> API. >> > > > >> > > > > > >> 4. The new data generator source should be used both >> > for >> > > > >> > > DataStream >> > > > >> > > > > and >> > > > >> > > > > > >> Table/SQL, which means the current >> DataGeneratorSource >> > > > >> should be >> > > > >> > > > > > replaced >> > > > >> > > > > > >> with the new one. >> > > > >> > > > > > >> 5. The core event generation logic should be >> pluggable >> > to >> > > > >> > support >> > > > >> > > > > > various >> > > > >> > > > > > >> (test) scenarios, e.g. rondom stream, changlog >> stream, >> > > > >> > > controllable >> > > > >> > > > > > events >> > > > >> > > > > > >> per checkpoint, etc. >> > > > >> > > > > > >> >> > > > >> > > > > > >> which turns out that >> > > > >> > > > > > >> >> > > > >> > > > > > >> To solve 1+3+4 -> we will have to make a big effort >> to >> > > > >> replace >> > > > >> > the >> > > > >> > > > > > current >> > > > >> > > > > > >> DataGeneratorSource since the new Source API has a >> very >> > > > >> > different >> > > > >> > > > > > >> concept, especially for the stateful part. >> > > > >> > > > > > >> To solve 2+3 -> we have to find another name for the >> > new >> > > > >> > > > > implementation. >> > > > >> > > > > > >> To solve 1+3+4+5 -> It gets even more complicated to >> > > > support >> > > > >> > > > stateless >> > > > >> > > > > > and >> > > > >> > > > > > >> stateful scenarios simultaneously with one solution. >> > > > >> > > > > > >> >> > > > >> > > > > > >> If we want to solve all of these issues in one >> shot, It >> > > > might >> > > > >> > take >> > > > >> > > > > > months. >> > > > >> > > > > > >> Therefore, I would suggest starting from small and >> > > growing >> > > > up >> > > > >> > > > > > iteratively. >> > > > >> > > > > > >> >> > > > >> > > > > > >> The proposal for the kickoff is to focus on >> stateless >> > > event >> > > > >> > > > generation >> > > > >> > > > > > >> with e.g. rondom stream and use the name >> > > > >> > > > > "StatelessDataGeneratoSource". >> > > > >> > > > > > >> The will be a period of time that both >> > > DataGeneratorSource >> > > > >> will >> > > > >> > be >> > > > >> > > > > used >> > > > >> > > > > > by >> > > > >> > > > > > >> the developer. The current DataGeneratorSource will >> be >> > > then >> > > > >> > > > > deprecated, >> > > > >> > > > > > >> once we can(iteratively): >> > > > >> > > > > > >> 1. either enlarge the scope of >> > > StatelessDataGeneratoSourcer >> > > > >> to >> > > > >> > be >> > > > >> > > > able >> > > > >> > > > > > to >> > > > >> > > > > > >> cover stateful scenarios and renaming it to >> > > > >> > > > > > "DataGeneratorSourceV2"(follow >> > > > >> > > > > > >> the naming convention of SinkV2) or >> > > > >> > > > > > >> 2. develop a new "SatefullDataGeneratorSource" >> based on >> > > > >> Source >> > > > >> > API >> > > > >> > > > > which >> > > > >> > > > > > >> can handle the stateful scenarios, if it is >> impossible >> > to >> > > > >> > support >> > > > >> > > > both >> > > > >> > > > > > >> stateless and stateful scenarios with one >> > GeneratorSource >> > > > >> > > > > > implementation. >> > > > >> > > > > > >> >> > > > >> > > > > > >> Best regards, >> > > > >> > > > > > >> Jing >> > > > >> > > > > > >> >> > > > >> > > > > > >> On Thu, Jun 9, 2022 at 2:48 PM Martijn Visser < >> > > > >> > > > > martijnvis...@apache.org >> > > > >> > > > > > > >> > > > >> > > > > > >> wrote: >> > > > >> > > > > > >> >> > > > >> > > > > > >>> Hey Alex, >> > > > >> > > > > > >>> >> > > > >> > > > > > >>> Yes, I think we need to make sure that we're not >> > causing >> > > > >> > > confusion >> > > > >> > > > (I >> > > > >> > > > > > know >> > > > >> > > > > > >>> I already was confused). I think the >> > DataSupplierSource >> > > is >> > > > >> > > already >> > > > >> > > > > > better, >> > > > >> > > > > > >>> but perhaps there are others who have an even >> better >> > > idea. >> > > > >> > > > > > >>> >> > > > >> > > > > > >>> Thanks, >> > > > >> > > > > > >>> >> > > > >> > > > > > >>> Martijn >> > > > >> > > > > > >>> >> > > > >> > > > > > >>> Op do 9 jun. 2022 om 14:28 schreef Alexander >> Fedulov < >> > > > >> > > > > > >>> alexan...@ververica.com>: >> > > > >> > > > > > >>> >> > > > >> > > > > > >>>> Hi Martijn, >> > > > >> > > > > > >>>> >> > > > >> > > > > > >>>> It seems that they serve a bit different purposes >> > > though. >> > > > >> The >> > > > >> > > > > > >>>> DataGenTableSource is for generating random data >> > > > described >> > > > >> by >> > > > >> > > the >> > > > >> > > > > > Table >> > > > >> > > > > > >>>> DDL and is tied into the >> > RowDataGenerator/DataGenerator >> > > > >> > concept >> > > > >> > > > > which >> > > > >> > > > > > is >> > > > >> > > > > > >>>> implemented as an Iterator<T>. The proposed API >> in >> > > > >> contrast >> > > > >> > is >> > > > >> > > > > > supposed >> > > > >> > > > > > >>>> to provide users with an easy way to supply their >> > > custom >> > > > >> data. >> > > > >> > > > > Another >> > > > >> > > > > > >>>> difference is that a DataGenerator is supposed to >> be >> > > > >> stateful >> > > > >> > > and >> > > > >> > > > > has >> > > > >> > > > > > to >> > > > >> > > > > > >>>> snapshot its state, whereas the proposed API is >> > purely >> > > > >> driven >> > > > >> > by >> > > > >> > > > the >> > > > >> > > > > > >>> input >> > > > >> > > > > > >>>> index IDs and can be stateless yet remain >> > > deterministic. >> > > > >> Are >> > > > >> > you >> > > > >> > > > > sure >> > > > >> > > > > > it >> > > > >> > > > > > >>>> is a good idea to mix them into the same API? We >> > could >> > > > >> think >> > > > >> > of >> > > > >> > > > > using >> > > > >> > > > > > a >> > > > >> > > > > > >>>> different name to make it less confusing for the >> > users >> > > > >> > > (something >> > > > >> > > > > like >> > > > >> > > > > > >>>> DataSupplierSource). >> > > > >> > > > > > >>>> >> > > > >> > > > > > >>>> Best, >> > > > >> > > > > > >>>> Alexander Fedulov >> > > > >> > > > > > >>>> >> > > > >> > > > > > >>>> On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser < >> > > > >> > > > > > martijnvis...@apache.org >> > > > >> > > > > > >>>> >> > > > >> > > > > > >>>> wrote: >> > > > >> > > > > > >>>> >> > > > >> > > > > > >>>>> Hi Alex, >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>>>> Thanks for creating the FLIP and opening up the >> > > > >> discussion. >> > > > >> > +1 >> > > > >> > > > > > overall >> > > > >> > > > > > >>> for >> > > > >> > > > > > >>>>> getting this in place. >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>>>> One question: you've already mentioned that this >> > > > focussed >> > > > >> on >> > > > >> > > the >> > > > >> > > > > > >>>>> DataStream >> > > > >> > > > > > >>>>> API. I think it would be a bit confusing that we >> > have >> > > a >> > > > >> > Datagen >> > > > >> > > > > > >>> connector >> > > > >> > > > > > >>>>> (on the Table side) that wouldn't leverage this >> > target >> > > > >> > > > interface. I >> > > > >> > > > > > >>> think >> > > > >> > > > > > >>>>> it would be good if we could already have one >> > generic >> > > > >> Datagen >> > > > >> > > > > > connector >> > > > >> > > > > > >>>>> which works for both DataStream API (so that >> would >> > be >> > > a >> > > > >> new >> > > > >> > one >> > > > >> > > > in >> > > > >> > > > > > the >> > > > >> > > > > > >>>>> Flink repo) and that the Datagen in the Table >> > > landscape >> > > > is >> > > > >> > > using >> > > > >> > > > > this >> > > > >> > > > > > >>>>> target interface too. What do you think? >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>>>> Best regards, >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>>>> Martijn >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>>>> Op wo 8 jun. 2022 om 14:21 schreef Alexander >> > Fedulov < >> > > > >> > > > > > >>>>> alexan...@ververica.com>: >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>>>>> Hi Xianxun, >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>> Thanks for bringing it up. I do believe it >> would be >> > > > >> useful >> > > > >> > to >> > > > >> > > > have >> > > > >> > > > > > >>> such >> > > > >> > > > > > >>>>> a >> > > > >> > > > > > >>>>>> CDC data generator but I see the >> > > > >> > > > > > >>>>>> efforts to provide one a bit orthogonal to the >> > > > >> > > > DataSourceGenerator >> > > > >> > > > > > >>>>> proposed >> > > > >> > > > > > >>>>>> in the FLIP. FLIP-238 focuses >> > > > >> > > > > > >>>>>> on the DataStream API and I could see >> integration >> > > into >> > > > >> the >> > > > >> > > > > Table/SQL >> > > > >> > > > > > >>>>>> ecosystem as the next step that I would >> > > > >> > > > > > >>>>>> prefer to keep separate (see KafkaDynamicSource >> > > reusing >> > > > >> > > > > > >>>>>> KafkaSource<RowData> >> > > > >> > > > > > >>>>>> under the hood [1]). >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>> [1] >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223 >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>> Best, >> > > > >> > > > > > >>>>>> Alexander Fedulov >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye < >> > > > >> > yxx_c...@163.com> >> > > > >> > > > > > wrote: >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>>>> Hey Alexander, >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Making datagen source connector easier to use >> is >> > > > really >> > > > >> > > helpful >> > > > >> > > > > > >>> during >> > > > >> > > > > > >>>>>>> doing some PoC/Demo. >> > > > >> > > > > > >>>>>>> And I thought about is it possible to produce a >> > > > >> changelog >> > > > >> > > > stream >> > > > >> > > > > by >> > > > >> > > > > > >>>>>>> datagen source, so a new flink developer can >> > > practice >> > > > >> flink >> > > > >> > > sql >> > > > >> > > > > > >>> with >> > > > >> > > > > > >>>>> cdc >> > > > >> > > > > > >>>>>>> data using Flink SQL Client CLI. >> > > > >> > > > > > >>>>>>> In the flink-examples-table module, a >> > > > >> > ChangelogSocketExample >> > > > >> > > > > > >>> class[1] >> > > > >> > > > > > >>>>>>> describes how to ingest delete or insert data >> by >> > > 'nc' >> > > > >> > > command. >> > > > >> > > > > Can >> > > > >> > > > > > >>> we >> > > > >> > > > > > >>>>>>> support producing a changelog stream by the new >> > > > datagen >> > > > >> > > source? >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> [1] >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79 >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Best regards, >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Xianxun >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> On 06/8/2022 08:10,Alexander Fedulov< >> > > > >> > alexan...@ververica.com >> > > > >> > > > >> > > > >> > > > > > >>>>>>> <alexan...@ververica.com> wrote: >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> I looked a bit further and it seems it should >> > > actually >> > > > >> be >> > > > >> > > > easier >> > > > >> > > > > > >>> than >> > > > >> > > > > > >>>>> I >> > > > >> > > > > > >>>>>>> initially thought: SourceReader extends >> > > > >> CheckpointListener >> > > > >> > > > > > >>> interface >> > > > >> > > > > > >>>>> and >> > > > >> > > > > > >>>>>>> with its custom implementation it should be >> > possible >> > > > to >> > > > >> > > achieve >> > > > >> > > > > > >>>>> similar >> > > > >> > > > > > >>>>>>> results. A prototype that I have for the >> generator >> > > > uses >> > > > >> an >> > > > >> > > > > > >>>>>>> IteratorSourceReader >> > > > >> > > > > > >>>>>>> under the hood by default but we could consider >> > > adding >> > > > >> the >> > > > >> > > > > ability >> > > > >> > > > > > >>> to >> > > > >> > > > > > >>>>>>> supply something like a >> > > > DataGeneratorSourceReaderFactory >> > > > >> > that >> > > > >> > > > > would >> > > > >> > > > > > >>>>> allow >> > > > >> > > > > > >>>>>>> provisioning the DataGeneratorSource with >> > customized >> > > > >> > > > > > >>> implementations >> > > > >> > > > > > >>>>> for >> > > > >> > > > > > >>>>>>> cases like this. >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Best, >> > > > >> > > > > > >>>>>>> Alexander Fedulov >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> On Wed, Jun 8, 2022 at 12:58 AM Alexander >> Fedulov >> > < >> > > > >> > > > > > >>>>>> alexan...@ververica.com >> > > > >> > > > > > >>>>>>>> >> > > > >> > > > > > >>>>>>> wrote: >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Hi Steven, >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> This is going to be tricky since in the new >> Source >> > > API >> > > > >> the >> > > > >> > > > > > >>>>> checkpointing >> > > > >> > > > > > >>>>>>> aspects that you based your logic on are pushed >> > > > further >> > > > >> > away >> > > > >> > > > from >> > > > >> > > > > > >>> the >> > > > >> > > > > > >>>>>>> low-level interfaces responsible for handling >> data >> > > and >> > > > >> > splits >> > > > >> > > > > [1]. >> > > > >> > > > > > >>> At >> > > > >> > > > > > >>>>> the >> > > > >> > > > > > >>>>>>> same time, the SourceCoordinatorProvider is >> > > hardwired >> > > > >> into >> > > > >> > > the >> > > > >> > > > > > >>>>> internals >> > > > >> > > > > > >>>>>>> of the framework, so I don't think it will be >> > > possible >> > > > >> to >> > > > >> > > > > provide a >> > > > >> > > > > > >>>>>>> customized implementation for testing purposes. >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> The only chance to tie data generation to >> > > > checkpointing >> > > > >> in >> > > > >> > > the >> > > > >> > > > > new >> > > > >> > > > > > >>>>> Source >> > > > >> > > > > > >>>>>>> API that I see at the moment is via the >> > > > SplitEnumerator >> > > > >> > > > > serializer >> > > > >> > > > > > >>> ( >> > > > >> > > > > > >>>>>>> getEnumeratorCheckpointSerializer() method) >> [2]. >> > In >> > > > >> theory, >> > > > >> > > it >> > > > >> > > > > > >>> should >> > > > >> > > > > > >>>>> be >> > > > >> > > > > > >>>>>>> possible to share a variable visible both to >> the >> > > > >> generator >> > > > >> > > > > function >> > > > >> > > > > > >>>>> and >> > > > >> > > > > > >>>>>> to >> > > > >> > > > > > >>>>>>> the serializer and manipulate it whenever the >> > > > >> serialize() >> > > > >> > > > method >> > > > >> > > > > > >>> gets >> > > > >> > > > > > >>>>>>> called upon a checkpoint request. That said, >> you >> > > still >> > > > >> > won't >> > > > >> > > > get >> > > > >> > > > > > >>>>>>> notifications of successful checkpoints that >> you >> > > > >> currently >> > > > >> > > use >> > > > >> > > > > > >>> (this >> > > > >> > > > > > >>>>> info >> > > > >> > > > > > >>>>>>> is only available to the SourceCoordinator). >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> In general, regardless of the generator >> > > implementation >> > > > >> > > itself, >> > > > >> > > > > the >> > > > >> > > > > > >>> new >> > > > >> > > > > > >>>>>>> Source >> > > > >> > > > > > >>>>>>> API does not seem to support the use case of >> > > verifying >> > > > >> > > > > checkpoints >> > > > >> > > > > > >>>>>>> contents in lockstep with produced data, at >> least >> > I >> > > do >> > > > >> not >> > > > >> > > see >> > > > >> > > > an >> > > > >> > > > > > >>>>>> immediate >> > > > >> > > > > > >>>>>>> solution for this. Can you think of a different >> > way >> > > of >> > > > >> > > checking >> > > > >> > > > > the >> > > > >> > > > > > >>>>>>> correctness of the Iceberg Sink implementation >> > that >> > > > does >> > > > >> > not >> > > > >> > > > rely >> > > > >> > > > > > >>> on >> > > > >> > > > > > >>>>> this >> > > > >> > > > > > >>>>>>> approach? >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Best, >> > > > >> > > > > > >>>>>>> Alexander Fedulov >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> [1] >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> [2] >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu < >> > > > >> > > stevenz...@gmail.com >> > > > >> > > > > >> > > > >> > > > > > >>>>> wrote: >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> In Iceberg source, we have a data generator >> source >> > > > that >> > > > >> can >> > > > >> > > > > control >> > > > >> > > > > > >>>>> the >> > > > >> > > > > > >>>>>>> records per checkpoint cycle. Can we support >> sth >> > > like >> > > > >> this >> > > > >> > in >> > > > >> > > > the >> > > > >> > > > > > >>>>>>> DataGeneratorSource? >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java >> > > > >> > > > > > >>>>>>> public BoundedTestSource(List<List<T>> >> > > > >> > elementsPerCheckpoint, >> > > > >> > > > > > >>> boolean >> > > > >> > > > > > >>>>>>> checkpointEnabled) >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Thanks, >> > > > >> > > > > > >>>>>>> Steven >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 8:48 AM Alexander >> Fedulov < >> > > > >> > > > > > >>>>>> alexan...@ververica.com >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> wrote: >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Hi everyone, >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> I would like to open a discussion on FLIP-238: >> > > > Introduce >> > > > >> > > > > > >>> FLIP-27-based >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Data >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Generator Source [1]. During the discussion >> about >> > > > >> > deprecating >> > > > >> > > > the >> > > > >> > > > > > >>>>>>> SourceFunction API [2] it became evident that >> an >> > > > >> > easy-to-use >> > > > >> > > > > > >>>>>>> FLIP-27-compatible data generator source is >> needed >> > > so >> > > > >> that >> > > > >> > > the >> > > > >> > > > > > >>> current >> > > > >> > > > > > >>>>>>> SourceFunction-based data generator >> > implementations >> > > > >> could >> > > > >> > be >> > > > >> > > > > phased >> > > > >> > > > > > >>>>> out >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> for >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> both Flink demo/PoC applications and for the >> > > internal >> > > > >> Flink >> > > > >> > > > > tests. >> > > > >> > > > > > >>>>> This >> > > > >> > > > > > >>>>>>> FLIP proposes to introduce a generic >> > > > DataGeneratorSource >> > > > >> > > > capable >> > > > >> > > > > of >> > > > >> > > > > > >>>>>>> producing events of an arbitrary type based on >> a >> > > > >> > > user-supplied >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> MapFunction. >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Looking forward to your feedback. >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> [1] >> https://cwiki.apache.org/confluence/x/9Av1D >> > > > >> > > > > > >>>>>>> [2] >> > > > >> > > > > > >>> >> > > > >> > >> https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> Best, >> > > > >> > > > > > >>>>>>> Alexander Fedulov >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>>> >> > > > >> > > > > > >>>>>> >> > > > >> > > > > > >>>>> >> > > > >> > > > > > >>>> >> > > > >> > > > > > >>> >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > > >> > > > >> > > >> > >> >