[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781471#comment-17781471
 ] 

Maciej Obuchowski commented on FLINK-31275:
-------------------------------------------

[~zjureel] Sorry, it's probably too late for this comment as FLIP is accepted. 
But I'll still voice concern here from {{[OpenLineage|https://openlineage.io/] 
}}as we'd want to implement this interface.

It seems like the intented way for LineageVertex interface is to just provide 
config context to particular nodes:

```
{{public}} {{interface}} {{LineageVertex {}}
{{    }}{{/* Config for the lineage vertex contains all the options for the 
connector. */}}
{{    }}{{Map<String, String> config();}}
{{}}}
```
and then in particular case, when the listener understand particular 
implementation, provide more information:

```

{{// Create kafka source class with lineage vertex}}
{{public}} {{class}} {{KafkaVectorSource }}{{extends}} {{KafkaSource 
}}{{implements}} {{LineageVertexProvider {}}
{{    }}{{int}} {{capacity;}}
{{    }}{{String valueType;}}
 
{{    }}{{public}} {{LineageVertex LineageVertex() {}}
{{        }}{{return}} {{new}} {{KafkaVectorLineageVertex(capacity, 
valueType);}}
{{    }}{{}}}
{{}}}
{{```}}
 
I think this is problematic because it strongly couples the listener to 
particular vertex implementation.
If you want to get list of datasets that are read by particular Flink job, 
you'll have to understand where the config is coming from and it's structure. 
Additionally, sometimes config is not everything we need to get lineage - for 
example, for Kafka connector we could get regex pattern used for reading that 
we'd need to resolve ourselves.
Or, if the connector subclasses `LineageVector` then another option is to get 
additional information from the subclass - but still, the connector has to 
understand it.
Another problem is that the configuration structure for particular connector 
can have breaking changes between version - so we're tied not only to 
connector, but also particular version of it.

But if we pushed the responsibility of understanting the datasets that 
particular vertex of a graph produces to the connector itself, we'd not have 
this problem.
First, the connector understands where it's reading from and writing to - so 
providing that information is easy for it. 
Second, the versioning problem does not exist - because the connector can 
update the code responsible for providing dataset information at same PR that 
breaks it, which will be transparent for the listener.


I would imagine the interface to be just something like this:


```
{{public}} {{interface}} {{LineageVertex {}}
{{    }}{{/* Config for the lineage vertex contains all the options for the 
connector. */}}
{{    }}{{Map<String, String> config();}}
{{    /* List of datasets that are consumed by this job */}}{{    }}
{{    }}{{List<Dataset> inputs();}}
{{    /* List of datasets that are produced by this job */}}{{    }}
{{    }}{{List<Dataset> outputs();}}
{{}}}
```
 
What dataset is in this case is debatable: from OL perspective it would be best 
if this would be something similar to 
[https://openlineage.io/apidocs/javadoc/io/openlineage/client/openlineage.dataset]
 - get name (ex. table name) and namespace (ex. standarized database 
identifier). It also provides extensible list of facets that represent 
additional information about the dataset that the particular connection wants 
to expose together with just dataset identifier - ex. something that represents 
table schema or side of the dataset. It could be something Flink - specific, 
but should allow particular connections to expose the additional information.
 

> Flink supports reporting and storage of source/sink tables relationship
> -----------------------------------------------------------------------
>
>                 Key: FLINK-31275
>                 URL: https://issues.apache.org/jira/browse/FLINK-31275
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Fang Yong
>            Assignee: Fang Yong
>            Priority: Major
>
> FLIP-314 has been accepted 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to