jsingh-yelp opened a new pull request, #8001: URL: https://github.com/apache/paimon/pull/8001
### Purpose - Linked Issue: https://github.com/apache/paimon/issues/7306 - Add `FLIP-314` lineage support for Paimon's non-Table (DataStream) APIs. Previously, lineage was only emitted for Table API jobs. This PR adds `LineageVertexProvider` support for jobs that use `FlinkSourceBuilder` and `FlinkSink` directly. - This pull request extends upon the work done previously here: https://github.com/apache/paimon/pull/7311 ### What is covered? Component | Approach | Covers -- | -- | -- DataStream Sources | `PaimonDataStreamSource` wraps any Paimon Source with `LineageVertexProvider` | `StaticFileStoreSource`, `ContinuousFileStoreSource`, `AlignedContinuousFileStoreSource`, `MonitorSource` (via FlinkSourceBuilder) DataStream Sinks | `PaimonDiscardingSink` extends `DiscardingSink` with lineage | All FlinkSink subclasses (append, upsert, CDC fixed/dynamic bucket) Format Table Sink | `FormatTableSink` implements `LineageVertexProvider` directly | `FlinkFormatTableDataStreamSink` (Parquet/ORC direct writes) ### What is still not covered? Not covered | Reason -- | -- Multi-table CDC sinks (`FlinkCdcMultiTableSink`) | Tables are created dynamically at runtime, while getLineageVertex() is called during graph construction before the output tables are known. This can be revisited in a future improvement. `CompactorSourceBuilder` / `SystemTableSource` | Internal utilities, not user-facing data pipelines ### Tests - Added tests for the changes done in this PR. - Did manual testing for various use cases I had readily available for both Paimon as a source and sink: - **Kafka to Paimon**: - Sinking data from kafka to paimon, in this test case Paimon uses `KafkaSyncTableAction` → `FlinkSink`. - Tested with fixed bucket CdcFixedBucketSink (bucket=1 or 4) and CdcDynamicBucketSink (bucket = -1). - Example lineage event: https://i.fluffy.cc/6wvpTj2Q7p8XqpZq73JQ5H6V3pmCNSmJ.html#L54-96 - **Paimon to Cassandra**: - Sinking data from paimon to cassandra, in this test case Paimon uses `FlinkSourceBuilder` via Datastream APIs. - Example lineage event: https://i.fluffy.cc/6wvpTj2Q7p8XqpZq73JQ5H6V3pmCNSmJ.html#L54-96 **Example paimon table looks like this inside the lineage event**: ``` { "namespace": "file:///tmp/paimon-warehouse", "name": "jaskaran_test.default_value", "facets": { "schema": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink", "_schemaURL": "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", "fields": [ { "name": "id", "type": "STRING NOT NULL" }, { "name": "age", "type": "INT" }, { "name": "source", "type": "STRING" } ] }, "symlinks": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink", "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", "identifiers": [] }, "config": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/DatasetFacet", "bucket": "-1", "catalog.warehouse": "file:///tmp/paimon-warehouse", "path": "file:/tmp/paimon-warehouse/jaskaran_test.db/default_value", "write-only": "true", "changelog-producer": "input", "partition-keys": "", "type": "paimon", "primary-keys": "id" } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
