Hi Shammon,

I like the idea in general and it will help to analysis the job lineages no 
matter FlinkSQL or Flink jar jobs in production environments.

For Qingsheng's concern, I'd like the name of JobType more than 
RuntimeExecutionMode, as the latter one is not easy to understand for users.

I have one more question on the lookup-join dim tables, it seems this FLIP does 
not touch them, and will them become part of the List<LineageEntity> sources()​ 
or adding another interface?

By the way, if you want to focus on job lineage instead of data column lineage 
in this FLIP, why we must introduce so many column-lineage related interface 
here?


Best
Yun Tang
________________________________
From: Shammon FY <zjur...@gmail.com>
Sent: Sunday, June 25, 2023 16:13
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Hi Qingsheng,

Thanks for your valuable feedback.

> 1. Is there any specific use case to expose the batch / streaming info to
listeners or meta services?

I agree with you that Flink is evolving towards batch-streaming
unification, but the lifecycle of them is different. If a job processes a
bound dataset, it will end after completing the data processing, otherwise,
it will run for a long time. In our scenario, we will regularly schedule
some Flink jobs to process bound dataset and update some job information to
the lineage information for the "batch" jobs such as scheduled timestamp,
execution duration when jobs are finished, which is different from
"streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
`existsUnboundedSource` in `StreamingGraph` and `StreamingGraphGenerator`
to determine `JobType` and disjoin jobs. We can mark `JobType` as
`PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what do
you think of it?

> 2. it’s better to be more specific here to tell users what information
they could expect to see here, instead of just a “job configuration” as
described in JavaDoc.

Thanks and I have updated the doc in FLIP.

> 3. About the IO executor in JobStatusChangedListenerFactory.Context.

I have updated the docs for io executor  in
`JobStatusChangedListenerFactory.Context`, it is a regular thread pool and
executes submitted tasks in parallel. Users can submit tasks to the
executor which ensures that the submitted task can be executed before the
job exits.

> 4. I don’t quite get the LineageRelationEntity, which is just a list of
LineageEntity.

In the initial idea, the `LineageRelationEntity` is used for `DataStream`
to set additional lineage information besides source. For example, there
are table and column lineages in SQL jobs. When we build a `DataStream` job
with table source and sink, we can add table lineage in the following
method.
```
public class DataStreamSink {
    public DataStreamSink setLineageSources(LineageEntity ... sources);
}
```
But we can not set column lineage for the above sink, and for the sake of
universality, we do not want to add a method similar to `addLineageColumn
(...)` in `DataStreamSink`. So I put this information into
LineageRelationEntity so that SQL and DataStream jobs can be consistent.
But as you mentioned, this approach does indeed lead to ambiguity and
complexity. So my current idea is to add the `setLineageRelation` method in
`DataStreamSink` directly without `LineageRelationEntity`, I have updated
the FLIP and please help to review it again, thanks.

> 5. I can’t find the definition of CatalogContext in the current code base
and Flink, which appears in the TableLineageEntity.

CatalogContext is defined in FLIP-294 and I have updated the FLIP

> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
(the “override” is quite confusing). I’m wondering if these are necessary
for meta services, as they are actually concepts defined in the runtime
level of Flink Table / SQL.

The information in `TableSinkLineageEntity` such as `ModifyType`,
`ChangelogMode` and `override` are mainly used for verification and
display. For example, Flink currently supports `INSERT`/`DELETE` and
`UPDATE`, we only want to report and update lineage for `INSERT` jobs in
our streaming & batch ETL, and display the `override` information on the UI.


Best,
Shammon FY


On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> wrote:

> Hi Shammon,
>
> Thanks for starting this FLIP! Data lineage is a very important topic,
> which has been missing for a long time in Flink. I have some questions
> about the FLIP.
>
> About events and listeners:
>
> 1. I’m not sure if it is necessary to expose JobType to in JobCreatedEvent.
> This is an internal class in flink-runtime, and I think the correct API
> should be RuntimeExecutionMode. Furthermore, I think the boundary of batch
> and streaming becomes much more vague as Flink is evolving towards
> batch-streaming unification, so I’m concerned about exposing JobType as a
> public API. Is there any specific use case to expose the batch / streaming
> info to listeners or meta services?
>
> 2. Currently JobCreatedEvent gives a Configuration, which is quite
> ambiguous. To be honest the configuration is quite a mess in Flink, so
> maybe it’s better to be more specific here to tell users what information
> they could expect to see here, instead of just a “job configuration” as
> described in JavaDoc.
>
> 3. JobStatusChangedListenerFactory.Context provides an IO executor. I think
> more information should be provided here, such as which thread model this
> executor could promise, and whether the user should care about concurrency
> issues. Otherwise I prefer not to give such an utility that no one dares to
> use safely, and leave it to users to choose their implementation.
>
> About lineage:
>
> 4. I don’t quite get the LineageRelationEntity, which is just a list of
> LineageEntity. Could you elaborate more on this class? From my naive
> imagination, the lineage is shaped as a DAG, where vertices are sources and
> sinks (LineageEntity) and edges are connections between them
> (LineageRelation), so it is a bit confusing for a name mixing these two
> concepts.
>
> 5. I can’t find the definition of CatalogContext in the current code base
> and Flink, which appears in the TableLineageEntity.
>
> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
> (the “override” is quite confusing). I’m wondering if these are necessary
> for meta services, as they are actually concepts defined in the runtime
> level of Flink Table / SQL.
>
> Best,
> Qingsheng
>
> On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zjur...@gmail.com> wrote:
>
> > Hi devs,
> >
> > Is there any comment or feedback for this FLIP? Hope to hear from you,
> > thanks
> >
> > Best,
> > Shammon FY
> >
> >
> > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjur...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > I would like to start a discussion on FLIP-314: Support Customized Job
> > > Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> > > streaming and batch jobs create lineage dependency between source and
> > sink,
> > > users can manage their data and jobs according to this lineage
> > information.
> > > For example, when there is a delay in Flink ETL or data, users can
> easily
> > > trace the problematic jobs and affected data. On the other hand, when
> > users
> > > need to correct data or debug, they can perform operations based on
> > lineage
> > > too.
> > >
> > > In FLIP-314 we want to introduce lineage related interfaces for Flink
> and
> > > users can create customized job status listeners. When job status
> > changes,
> > > users can get job status and information to add, update or delete
> > lineage.
> > >
> > > Looking forward to your feedback, thanks.
> > >
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > >
> > > Best,
> > > Shammon FY
> > >
> >
>

Reply via email to