Hi devs, Thanks for all the feedback.
I have discussed with @QingSheng Ren off-line to confirm some questionable points in the FLIP. Thanks for his valuable inputs and I have updated the FLIP according to our discussion. Looking forward to your feedback, thanks, Best, Shammon FY On Wed, Jul 5, 2023 at 5:26 PM Shammon FY <zjur...@gmail.com> wrote: > Hi Jing, > > Thanks for your feedback. > > > 1. TableColumnLineageRelation#sinkColumn() should return > TableColumnLineageEntity instead of String, right? > > The `sinkColumn()` will return `String` which is the column name in the > sink connector. I found the name of `TableColumnLineageEntity` may > cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`. > In my mind the `TableColumnLineageRelation` represents the lineage for each > sink column, each column may be computed from multiple sources and columns. > I use `TableColumnSourceLineageEntity` to manage each source and its > columns for the sink column, so `TableColumnLineageRelation` has a sink > column name and `TableColumnSourceLineageEntity` list. > > > 2. Since LineageRelation already contains all information to build the > lineage between sources and sink, do we still need to set the LineageEntity > in the source? > > The lineage interface of `DataStream` is very flexible. We have added > `setLineageEntity` to the source to limit and verify user behavior, > ensuring that users have not added non-existent sources as lineage. > > > 3. About the "Entity" and "Relation" naming, I was confused too, like > Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges > which contains multiple LineageEdge? > > We referred to `Atlas` for the name of lineage, it uses `Entity` and > `Relation` to represent the lineage relationship and another metadata > service `Datahub` uses `DataSet` to represent the entity. I think `Entity` > and `Relation` are nicer for lineage, what do you think of it? > > Best, > Shammon FY > > > On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <j...@ververica.com.invalid> > wrote: > >> Hi Shammon, >> >> Thanks for your proposal. After reading the FLIP, I'd like to ask >> some questions to make sure we are on the same page. Thanks! >> >> 1. TableColumnLineageRelation#sinkColumn() should return >> TableColumnLineageEntity instead of String, right? >> >> 2. Since LineageRelation already contains all information to build the >> lineage between sources and sink, do we still need to set the >> LineageEntity >> in the source? >> >> 3. About the "Entity" and "Relation" naming, I was confused too, like >> Qingsheng mentioned. How about LineageVertex, LineageEdge, and >> LineageEdges >> which contains multiple LineageEdge? E.g. multiple sources join into one >> sink, or, edges of columns from one or different tables, etc. >> >> Best regards, >> Jing >> >> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zjur...@gmail.com> wrote: >> >> > Hi yuxia and Yun, >> > >> > Thanks for your input. >> > >> > For yuxia: >> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven` >> including? >> > >> > At present, we only need to notify the listener when a job goes to >> > termination, but I think it makes sense to add generic `oldStatus` and >> > `newStatus` in the listener and users can update the job state in their >> > service as needed. >> > >> > > 2: I'm really confused about the `config()` included in >> `LineageEntity`, >> > where is it from and what is it for ? >> > >> > The `config` in `LineageEntity` is used for users to get options for >> source >> > and sink connectors. As the examples in the FLIP, users can add >> > server/group/topic information in the config for kafka and create >> lineage >> > entities for `DataStream` jobs, then the listeners can get this >> information >> > to identify the same connector in different jobs. Otherwise, the >> `config` >> > in `TableLineageEntity` will be the same as `getOptions` in >> > `CatalogBaseTable`. >> > >> > > 3: Regardless whether `inputChangelogMode` in >> `TableSinkLineageEntity` is >> > needed or not, since `TableSinkLineageEntity` contains >> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain >> > changelogmode? >> > >> > At present, we do not actually use the changelog mode. It can be >> deleted, >> > and I have updated FLIP. >> > >> > > Btw, since there're a lot interfaces proposed, I think it'll be >> better to >> > give an example about how to implement a listener in this FLIP to make >> us >> > know better about the interfaces. >> > >> > I have added the example in the FLIP and the related interfaces and >> > examples are in branch [1]. >> > >> > For Yun: >> > > I have one more question on the lookup-join dim tables, it seems this >> > FLIP does not touch them, and will them become part of the >> > List<LineageEntity> sources() or adding another interface? >> > >> > You're right, currently lookup join dim tables were not considered in >> the >> > 'proposed changed' section of this FLIP. But the interface for lineage >> is >> > universal and we can give `TableLookupSourceLineageEntity` which >> implements >> > `TableSourceLineageEntity` in the future without modifying the public >> > interface. >> > >> > > By the way, if you want to focus on job lineage instead of data column >> > lineage in this FLIP, why we must introduce so many column-lineage >> related >> > interface here? >> > >> > The lineage information in SQL jobs includes table lineage and column >> > lineage. Although SQL jobs currently do not support column lineage, we >> > would like to support this in the next step. So we have comprehensively >> > considered the table lineage and column lineage interfaces here, and >> > defined these two interfaces together clearly >> > >> > >> > [1] >> > >> > >> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c >> > >> > Best, >> > Shammon FY >> > >> > >> > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <myas...@live.com> wrote: >> > >> > > Hi Shammon, >> > > >> > > I like the idea in general and it will help to analysis the job >> lineages >> > > no matter FlinkSQL or Flink jar jobs in production environments. >> > > >> > > For Qingsheng's concern, I'd like the name of JobType more than >> > > RuntimeExecutionMode, as the latter one is not easy to understand for >> > users. >> > > >> > > I have one more question on the lookup-join dim tables, it seems this >> > FLIP >> > > does not touch them, and will them become part of the >> List<LineageEntity> >> > > sources() or adding another interface? >> > > >> > > By the way, if you want to focus on job lineage instead of data column >> > > lineage in this FLIP, why we must introduce so many column-lineage >> > related >> > > interface here? >> > > >> > > >> > > Best >> > > Yun Tang >> > > ________________________________ >> > > From: Shammon FY <zjur...@gmail.com> >> > > Sent: Sunday, June 25, 2023 16:13 >> > > To: dev@flink.apache.org <dev@flink.apache.org> >> > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage >> Listener >> > > >> > > Hi Qingsheng, >> > > >> > > Thanks for your valuable feedback. >> > > >> > > > 1. Is there any specific use case to expose the batch / streaming >> info >> > to >> > > listeners or meta services? >> > > >> > > I agree with you that Flink is evolving towards batch-streaming >> > > unification, but the lifecycle of them is different. If a job >> processes a >> > > bound dataset, it will end after completing the data processing, >> > otherwise, >> > > it will run for a long time. In our scenario, we will regularly >> schedule >> > > some Flink jobs to process bound dataset and update some job >> information >> > to >> > > the lineage information for the "batch" jobs such as scheduled >> timestamp, >> > > execution duration when jobs are finished, which is different from >> > > "streaming" jobs. Currently Flink uses `RuntimeExecutionMode` and >> > > `existsUnboundedSource` in `StreamingGraph` and >> `StreamingGraphGenerator` >> > > to determine `JobType` and disjoin jobs. We can mark `JobType` as >> > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, >> what >> > do >> > > you think of it? >> > > >> > > > 2. it’s better to be more specific here to tell users what >> information >> > > they could expect to see here, instead of just a “job configuration” >> as >> > > described in JavaDoc. >> > > >> > > Thanks and I have updated the doc in FLIP. >> > > >> > > > 3. About the IO executor in JobStatusChangedListenerFactory.Context. >> > > >> > > I have updated the docs for io executor in >> > > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool >> > and >> > > executes submitted tasks in parallel. Users can submit tasks to the >> > > executor which ensures that the submitted task can be executed before >> the >> > > job exits. >> > > >> > > > 4. I don’t quite get the LineageRelationEntity, which is just a >> list of >> > > LineageEntity. >> > > >> > > In the initial idea, the `LineageRelationEntity` is used for >> `DataStream` >> > > to set additional lineage information besides source. For example, >> there >> > > are table and column lineages in SQL jobs. When we build a >> `DataStream` >> > job >> > > with table source and sink, we can add table lineage in the following >> > > method. >> > > ``` >> > > public class DataStreamSink { >> > > public DataStreamSink setLineageSources(LineageEntity ... >> sources); >> > > } >> > > ``` >> > > But we can not set column lineage for the above sink, and for the >> sake of >> > > universality, we do not want to add a method similar to >> `addLineageColumn >> > > (...)` in `DataStreamSink`. So I put this information into >> > > LineageRelationEntity so that SQL and DataStream jobs can be >> consistent. >> > > But as you mentioned, this approach does indeed lead to ambiguity and >> > > complexity. So my current idea is to add the `setLineageRelation` >> method >> > in >> > > `DataStreamSink` directly without `LineageRelationEntity`, I have >> updated >> > > the FLIP and please help to review it again, thanks. >> > > >> > > > 5. I can’t find the definition of CatalogContext in the current code >> > base >> > > and Flink, which appears in the TableLineageEntity. >> > > >> > > CatalogContext is defined in FLIP-294 and I have updated the FLIP >> > > >> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a >> > boolean >> > > (the “override” is quite confusing). I’m wondering if these are >> necessary >> > > for meta services, as they are actually concepts defined in the >> runtime >> > > level of Flink Table / SQL. >> > > >> > > The information in `TableSinkLineageEntity` such as `ModifyType`, >> > > `ChangelogMode` and `override` are mainly used for verification and >> > > display. For example, Flink currently supports `INSERT`/`DELETE` and >> > > `UPDATE`, we only want to report and update lineage for `INSERT` jobs >> in >> > > our streaming & batch ETL, and display the `override` information on >> the >> > > UI. >> > > >> > > >> > > Best, >> > > Shammon FY >> > > >> > > >> > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> >> wrote: >> > > >> > > > Hi Shammon, >> > > > >> > > > Thanks for starting this FLIP! Data lineage is a very important >> topic, >> > > > which has been missing for a long time in Flink. I have some >> questions >> > > > about the FLIP. >> > > > >> > > > About events and listeners: >> > > > >> > > > 1. I’m not sure if it is necessary to expose JobType to in >> > > JobCreatedEvent. >> > > > This is an internal class in flink-runtime, and I think the correct >> API >> > > > should be RuntimeExecutionMode. Furthermore, I think the boundary of >> > > batch >> > > > and streaming becomes much more vague as Flink is evolving towards >> > > > batch-streaming unification, so I’m concerned about exposing JobType >> > as a >> > > > public API. Is there any specific use case to expose the batch / >> > > streaming >> > > > info to listeners or meta services? >> > > > >> > > > 2. Currently JobCreatedEvent gives a Configuration, which is quite >> > > > ambiguous. To be honest the configuration is quite a mess in Flink, >> so >> > > > maybe it’s better to be more specific here to tell users what >> > information >> > > > they could expect to see here, instead of just a “job >> configuration” as >> > > > described in JavaDoc. >> > > > >> > > > 3. JobStatusChangedListenerFactory.Context provides an IO executor. >> I >> > > think >> > > > more information should be provided here, such as which thread model >> > this >> > > > executor could promise, and whether the user should care about >> > > concurrency >> > > > issues. Otherwise I prefer not to give such an utility that no one >> > dares >> > > to >> > > > use safely, and leave it to users to choose their implementation. >> > > > >> > > > About lineage: >> > > > >> > > > 4. I don’t quite get the LineageRelationEntity, which is just a >> list of >> > > > LineageEntity. Could you elaborate more on this class? From my naive >> > > > imagination, the lineage is shaped as a DAG, where vertices are >> sources >> > > and >> > > > sinks (LineageEntity) and edges are connections between them >> > > > (LineageRelation), so it is a bit confusing for a name mixing these >> two >> > > > concepts. >> > > > >> > > > 5. I can’t find the definition of CatalogContext in the current code >> > base >> > > > and Flink, which appears in the TableLineageEntity. >> > > > >> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a >> > boolean >> > > > (the “override” is quite confusing). I’m wondering if these are >> > necessary >> > > > for meta services, as they are actually concepts defined in the >> runtime >> > > > level of Flink Table / SQL. >> > > > >> > > > Best, >> > > > Qingsheng >> > > > >> > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zjur...@gmail.com> >> wrote: >> > > > >> > > > > Hi devs, >> > > > > >> > > > > Is there any comment or feedback for this FLIP? Hope to hear from >> > you, >> > > > > thanks >> > > > > >> > > > > Best, >> > > > > Shammon FY >> > > > > >> > > > > >> > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjur...@gmail.com> >> wrote: >> > > > > >> > > > > > Hi devs, >> > > > > > >> > > > > > I would like to start a discussion on FLIP-314: Support >> Customized >> > > Job >> > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2]. >> Flink >> > > > > > streaming and batch jobs create lineage dependency between >> source >> > and >> > > > > sink, >> > > > > > users can manage their data and jobs according to this lineage >> > > > > information. >> > > > > > For example, when there is a delay in Flink ETL or data, users >> can >> > > > easily >> > > > > > trace the problematic jobs and affected data. On the other hand, >> > when >> > > > > users >> > > > > > need to correct data or debug, they can perform operations >> based on >> > > > > lineage >> > > > > > too. >> > > > > > >> > > > > > In FLIP-314 we want to introduce lineage related interfaces for >> > Flink >> > > > and >> > > > > > users can create customized job status listeners. When job >> status >> > > > > changes, >> > > > > > users can get job status and information to add, update or >> delete >> > > > > lineage. >> > > > > > >> > > > > > Looking forward to your feedback, thanks. >> > > > > > >> > > > > > >> > > > > > [1] >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener >> > > > > > [2] >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener >> > > > > > >> > > > > > Best, >> > > > > > Shammon FY >> > > > > > >> > > > > >> > > > >> > > >> > >> >