Hi devs,

Thanks for all the feedback.

I have discussed with @QingSheng Ren off-line to confirm some questionable
points in the FLIP. Thanks for his valuable inputs and I have updated the
FLIP according to our discussion.

Looking forward to your feedback, thanks,

Best,
Shammon FY


On Wed, Jul 5, 2023 at 5:26 PM Shammon FY <zjur...@gmail.com> wrote:

> Hi Jing,
>
> Thanks for your feedback.
>
> > 1. TableColumnLineageRelation#sinkColumn() should return
> TableColumnLineageEntity instead of String, right?
>
> The `sinkColumn()` will return `String` which is the column name in the
> sink connector. I found the name of `TableColumnLineageEntity` may
> cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`.
> In my mind the `TableColumnLineageRelation` represents the lineage for each
> sink column, each column may be computed from multiple sources and columns.
> I use `TableColumnSourceLineageEntity` to manage each source and its
> columns for the sink column, so `TableColumnLineageRelation` has a sink
> column name and `TableColumnSourceLineageEntity` list.
>
> > 2. Since LineageRelation already contains all information to build the
> lineage between sources and sink, do we still need to set the LineageEntity
> in the source?
>
> The lineage interface of `DataStream` is very flexible. We have added
> `setLineageEntity` to the source to limit and verify user behavior,
> ensuring that users have not added non-existent sources as lineage.
>
> > 3. About the "Entity" and "Relation" naming, I was confused too, like
> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
> which contains multiple LineageEdge?
>
> We referred to `Atlas` for the name of lineage, it uses `Entity` and
> `Relation` to represent the lineage relationship and another metadata
> service `Datahub` uses `DataSet` to represent the entity. I think `Entity`
> and `Relation` are nicer for lineage, what do you think of it?
>
> Best,
> Shammon FY
>
>
> On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <j...@ververica.com.invalid>
> wrote:
>
>> Hi Shammon,
>>
>> Thanks for your proposal. After reading the FLIP, I'd like to ask
>> some questions to make sure we are on the same page. Thanks!
>>
>> 1. TableColumnLineageRelation#sinkColumn() should return
>> TableColumnLineageEntity instead of String, right?
>>
>> 2. Since LineageRelation already contains all information to build the
>> lineage between sources and sink, do we still need to set the
>> LineageEntity
>> in the source?
>>
>> 3. About the "Entity" and "Relation" naming, I was confused too, like
>> Qingsheng mentioned. How about LineageVertex, LineageEdge, and
>> LineageEdges
>> which contains multiple LineageEdge? E.g. multiple sources join into one
>> sink, or, edges of columns from one or different tables, etc.
>>
>> Best regards,
>> Jing
>>
>> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zjur...@gmail.com> wrote:
>>
>> > Hi yuxia and Yun,
>> >
>> > Thanks for your input.
>> >
>> > For yuxia:
>> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven`
>> including?
>> >
>> > At present, we only need to notify the listener when a job goes to
>> > termination, but I think it makes sense to add generic `oldStatus` and
>> > `newStatus` in the listener and users can update the job state in their
>> > service as needed.
>> >
>> > > 2: I'm really confused about the `config()` included in
>> `LineageEntity`,
>> > where is it from and what is it for ?
>> >
>> > The `config` in `LineageEntity` is used for users to get options for
>> source
>> > and sink connectors. As the examples in the FLIP, users can add
>> > server/group/topic information in the config for kafka and create
>> lineage
>> > entities for `DataStream` jobs, then the listeners can get this
>> information
>> > to identify the same connector in different jobs. Otherwise, the
>> `config`
>> > in `TableLineageEntity` will be the same as `getOptions` in
>> > `CatalogBaseTable`.
>> >
>> > > 3: Regardless whether `inputChangelogMode` in
>> `TableSinkLineageEntity` is
>> > needed or not, since `TableSinkLineageEntity` contains
>> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
>> > changelogmode?
>> >
>> > At present, we do not actually use the changelog mode. It can be
>> deleted,
>> > and I have updated FLIP.
>> >
>> > > Btw, since there're a lot interfaces proposed, I think it'll be
>> better to
>> > give an example about how to implement a listener in this FLIP to make
>> us
>> > know better about the interfaces.
>> >
>> > I have added the example in the FLIP and the related interfaces and
>> > examples are in branch [1].
>> >
>> > For Yun:
>> > > I have one more question on the lookup-join dim tables, it seems this
>> > FLIP does not touch them, and will them become part of the
>> > List<LineageEntity> sources() or adding another interface?
>> >
>> > You're right, currently lookup join dim tables were not considered in
>> the
>> > 'proposed changed' section of this FLIP. But the interface for lineage
>> is
>> > universal and we can give `TableLookupSourceLineageEntity` which
>> implements
>> > `TableSourceLineageEntity` in the future without modifying the public
>> > interface.
>> >
>> > > By the way, if you want to focus on job lineage instead of data column
>> > lineage in this FLIP, why we must introduce so many column-lineage
>> related
>> > interface here?
>> >
>> > The lineage information in SQL jobs includes table lineage and column
>> > lineage. Although SQL jobs currently do not support column lineage, we
>> > would like to support this in the next step. So we have comprehensively
>> > considered the table lineage and column lineage interfaces here, and
>> > defined these two interfaces together clearly
>> >
>> >
>> > [1]
>> >
>> >
>> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
>> >
>> > Best,
>> > Shammon FY
>> >
>> >
>> > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <myas...@live.com> wrote:
>> >
>> > > Hi Shammon,
>> > >
>> > > I like the idea in general and it will help to analysis the job
>> lineages
>> > > no matter FlinkSQL or Flink jar jobs in production environments.
>> > >
>> > > For Qingsheng's concern, I'd like the name of JobType more than
>> > > RuntimeExecutionMode, as the latter one is not easy to understand for
>> > users.
>> > >
>> > > I have one more question on the lookup-join dim tables, it seems this
>> > FLIP
>> > > does not touch them, and will them become part of the
>> List<LineageEntity>
>> > > sources()​ or adding another interface?
>> > >
>> > > By the way, if you want to focus on job lineage instead of data column
>> > > lineage in this FLIP, why we must introduce so many column-lineage
>> > related
>> > > interface here?
>> > >
>> > >
>> > > Best
>> > > Yun Tang
>> > > ________________________________
>> > > From: Shammon FY <zjur...@gmail.com>
>> > > Sent: Sunday, June 25, 2023 16:13
>> > > To: dev@flink.apache.org <dev@flink.apache.org>
>> > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
>> Listener
>> > >
>> > > Hi Qingsheng,
>> > >
>> > > Thanks for your valuable feedback.
>> > >
>> > > > 1. Is there any specific use case to expose the batch / streaming
>> info
>> > to
>> > > listeners or meta services?
>> > >
>> > > I agree with you that Flink is evolving towards batch-streaming
>> > > unification, but the lifecycle of them is different. If a job
>> processes a
>> > > bound dataset, it will end after completing the data processing,
>> > otherwise,
>> > > it will run for a long time. In our scenario, we will regularly
>> schedule
>> > > some Flink jobs to process bound dataset and update some job
>> information
>> > to
>> > > the lineage information for the "batch" jobs such as scheduled
>> timestamp,
>> > > execution duration when jobs are finished, which is different from
>> > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
>> > > `existsUnboundedSource` in `StreamingGraph` and
>> `StreamingGraphGenerator`
>> > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
>> > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag,
>> what
>> > do
>> > > you think of it?
>> > >
>> > > > 2. it’s better to be more specific here to tell users what
>> information
>> > > they could expect to see here, instead of just a “job configuration”
>> as
>> > > described in JavaDoc.
>> > >
>> > > Thanks and I have updated the doc in FLIP.
>> > >
>> > > > 3. About the IO executor in JobStatusChangedListenerFactory.Context.
>> > >
>> > > I have updated the docs for io executor  in
>> > > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
>> > and
>> > > executes submitted tasks in parallel. Users can submit tasks to the
>> > > executor which ensures that the submitted task can be executed before
>> the
>> > > job exits.
>> > >
>> > > > 4. I don’t quite get the LineageRelationEntity, which is just a
>> list of
>> > > LineageEntity.
>> > >
>> > > In the initial idea, the `LineageRelationEntity` is used for
>> `DataStream`
>> > > to set additional lineage information besides source. For example,
>> there
>> > > are table and column lineages in SQL jobs. When we build a
>> `DataStream`
>> > job
>> > > with table source and sink, we can add table lineage in the following
>> > > method.
>> > > ```
>> > > public class DataStreamSink {
>> > >     public DataStreamSink setLineageSources(LineageEntity ...
>> sources);
>> > > }
>> > > ```
>> > > But we can not set column lineage for the above sink, and for the
>> sake of
>> > > universality, we do not want to add a method similar to
>> `addLineageColumn
>> > > (...)` in `DataStreamSink`. So I put this information into
>> > > LineageRelationEntity so that SQL and DataStream jobs can be
>> consistent.
>> > > But as you mentioned, this approach does indeed lead to ambiguity and
>> > > complexity. So my current idea is to add the `setLineageRelation`
>> method
>> > in
>> > > `DataStreamSink` directly without `LineageRelationEntity`, I have
>> updated
>> > > the FLIP and please help to review it again, thanks.
>> > >
>> > > > 5. I can’t find the definition of CatalogContext in the current code
>> > base
>> > > and Flink, which appears in the TableLineageEntity.
>> > >
>> > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
>> > >
>> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
>> > boolean
>> > > (the “override” is quite confusing). I’m wondering if these are
>> necessary
>> > > for meta services, as they are actually concepts defined in the
>> runtime
>> > > level of Flink Table / SQL.
>> > >
>> > > The information in `TableSinkLineageEntity` such as `ModifyType`,
>> > > `ChangelogMode` and `override` are mainly used for verification and
>> > > display. For example, Flink currently supports `INSERT`/`DELETE` and
>> > > `UPDATE`, we only want to report and update lineage for `INSERT` jobs
>> in
>> > > our streaming & batch ETL, and display the `override` information on
>> the
>> > > UI.
>> > >
>> > >
>> > > Best,
>> > > Shammon FY
>> > >
>> > >
>> > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
>> wrote:
>> > >
>> > > > Hi Shammon,
>> > > >
>> > > > Thanks for starting this FLIP! Data lineage is a very important
>> topic,
>> > > > which has been missing for a long time in Flink. I have some
>> questions
>> > > > about the FLIP.
>> > > >
>> > > > About events and listeners:
>> > > >
>> > > > 1. I’m not sure if it is necessary to expose JobType to in
>> > > JobCreatedEvent.
>> > > > This is an internal class in flink-runtime, and I think the correct
>> API
>> > > > should be RuntimeExecutionMode. Furthermore, I think the boundary of
>> > > batch
>> > > > and streaming becomes much more vague as Flink is evolving towards
>> > > > batch-streaming unification, so I’m concerned about exposing JobType
>> > as a
>> > > > public API. Is there any specific use case to expose the batch /
>> > > streaming
>> > > > info to listeners or meta services?
>> > > >
>> > > > 2. Currently JobCreatedEvent gives a Configuration, which is quite
>> > > > ambiguous. To be honest the configuration is quite a mess in Flink,
>> so
>> > > > maybe it’s better to be more specific here to tell users what
>> > information
>> > > > they could expect to see here, instead of just a “job
>> configuration” as
>> > > > described in JavaDoc.
>> > > >
>> > > > 3. JobStatusChangedListenerFactory.Context provides an IO executor.
>> I
>> > > think
>> > > > more information should be provided here, such as which thread model
>> > this
>> > > > executor could promise, and whether the user should care about
>> > > concurrency
>> > > > issues. Otherwise I prefer not to give such an utility that no one
>> > dares
>> > > to
>> > > > use safely, and leave it to users to choose their implementation.
>> > > >
>> > > > About lineage:
>> > > >
>> > > > 4. I don’t quite get the LineageRelationEntity, which is just a
>> list of
>> > > > LineageEntity. Could you elaborate more on this class? From my naive
>> > > > imagination, the lineage is shaped as a DAG, where vertices are
>> sources
>> > > and
>> > > > sinks (LineageEntity) and edges are connections between them
>> > > > (LineageRelation), so it is a bit confusing for a name mixing these
>> two
>> > > > concepts.
>> > > >
>> > > > 5. I can’t find the definition of CatalogContext in the current code
>> > base
>> > > > and Flink, which appears in the TableLineageEntity.
>> > > >
>> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
>> > boolean
>> > > > (the “override” is quite confusing). I’m wondering if these are
>> > necessary
>> > > > for meta services, as they are actually concepts defined in the
>> runtime
>> > > > level of Flink Table / SQL.
>> > > >
>> > > > Best,
>> > > > Qingsheng
>> > > >
>> > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zjur...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi devs,
>> > > > >
>> > > > > Is there any comment or feedback for this FLIP? Hope to hear from
>> > you,
>> > > > > thanks
>> > > > >
>> > > > > Best,
>> > > > > Shammon FY
>> > > > >
>> > > > >
>> > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjur...@gmail.com>
>> wrote:
>> > > > >
>> > > > > > Hi devs,
>> > > > > >
>> > > > > > I would like to start a discussion on FLIP-314: Support
>> Customized
>> > > Job
>> > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2].
>> Flink
>> > > > > > streaming and batch jobs create lineage dependency between
>> source
>> > and
>> > > > > sink,
>> > > > > > users can manage their data and jobs according to this lineage
>> > > > > information.
>> > > > > > For example, when there is a delay in Flink ETL or data, users
>> can
>> > > > easily
>> > > > > > trace the problematic jobs and affected data. On the other hand,
>> > when
>> > > > > users
>> > > > > > need to correct data or debug, they can perform operations
>> based on
>> > > > > lineage
>> > > > > > too.
>> > > > > >
>> > > > > > In FLIP-314 we want to introduce lineage related interfaces for
>> > Flink
>> > > > and
>> > > > > > users can create customized job status listeners. When job
>> status
>> > > > > changes,
>> > > > > > users can get job status and information to add, update or
>> delete
>> > > > > lineage.
>> > > > > >
>> > > > > > Looking forward to your feedback, thanks.
>> > > > > >
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>> > > > > > [2]
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
>> > > > > >
>> > > > > > Best,
>> > > > > > Shammon FY
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to