This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new a2c08c5e [FLINK-34467] add lineage integration for jdbc connector
(#149)
a2c08c5e is described below
commit a2c08c5e3949c5115d166df239d2486def990a4b
Author: Peter Huang <[email protected]>
AuthorDate: Mon Nov 17 23:07:49 2025 -0800
[FLINK-34467] add lineage integration for jdbc connector (#149)
---
.gitignore | 1 +
.../4bca2274-65a9-4a61-81ef-767d58233ea0 | 1 +
.../d45c3af5-52c6-45fd-9926-75e75e77473a | 4 +
flink-connector-jdbc-core/pom.xml | 10 ++
.../flink/connector/jdbc/JdbcInputFormat.java | 23 +++-
.../jdbc/core/datastream/sink/JdbcSink.java | 17 ++-
.../jdbc/core/datastream/source/JdbcSource.java | 26 +++-
.../core/table/source/JdbcRowDataInputFormat.java | 23 +++-
.../table/source/JdbcRowDataLookupFunction.java | 39 ++++--
.../connections/SimpleJdbcConnectionProvider.java | 4 +
.../jdbc/internal/GenericJdbcSinkFunction.java | 9 +-
.../connector/jdbc/internal/JdbcOutputFormat.java | 20 +++-
.../executor/InsertOrUpdateJdbcExecutor.java | 5 +
.../executor/JdbcBatchStatementExecutor.java | 3 +
.../executor/KeyedBatchStatementExecutor.java | 5 +
.../executor/SimpleBatchStatementExecutor.java | 5 +
.../TableBufferReducedStatementExecutor.java | 5 +
.../executor/TableBufferedStatementExecutor.java | 5 +
.../TableInsertOrUpdateStatementExecutor.java | 5 +
.../executor/TableSimpleStatementExecutor.java | 5 +
.../jdbc/lineage/DefaultTypeDatasetFacet.java | 61 ++++++++++
.../flink/connector/jdbc/lineage/JdbcLocation.java | 118 ++++++++++++++++++
.../jdbc/lineage/JdbcLocationExtractorFactory.java | 38 ++++++
.../flink/connector/jdbc/lineage/JdbcUtils.java | 100 ++++++++++++++++
.../flink/connector/jdbc/lineage/LineageUtils.java | 133 +++++++++++++++++++++
.../connector/jdbc/lineage/TypeDatasetFacet.java | 29 +++++
.../statement/FieldNamedPreparedStatement.java | 7 ++
.../statement/FieldNamedPreparedStatementImpl.java | 12 +-
.../src/main/resources/META-INF/NOTICE | 11 ++
.../flink/connector/jdbc/JdbcInputFormatTest.java | 32 +++++
.../connector/jdbc/JdbcRowOutputFormatTest.java | 19 +++
.../core/datastream/sink/BaseJdbcSinkTest.java | 14 +++
.../core/datastream/source/JdbcSourceITCase.java | 23 ++++
.../jdbc/core/table/sink/JdbcOutputFormatTest.java | 34 ++++++
.../jdbc/internal/JdbcTableOutputFormatTest.java | 41 +++++++
.../connector/jdbc/lineage/JdbcUtilsTest.java | 62 ++++++++++
.../connector/jdbc/lineage/LineageUtilsTest.java | 54 +++++++++
.../connector/jdbc/lineage/TestJdbcExtractor.java | 25 ++++
.../lineage/TestJdbcLocationExtractorFactory.java | 30 +++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
.../lineage/CrateLocationExtractorFactory.java | 35 ++++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
.../lineage/Db2LocationExtractorFactory.java | 35 ++++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
.../lineage/MySqlLocationExtractorFactory.java | 35 ++++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
.../lineage/OceanBaseLocationExtractorFactory.java | 35 ++++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
.../lineage/OracleLocationExtractorFactory.java | 34 ++++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
.../lineage/PostgresLocationExtractorFactory.java | 35 ++++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
.../lineage/SqlServerLocationExtractorFactory.java | 35 ++++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
.../lineage/TrinoLocationExtractorFactory.java | 35 ++++++
...ector.jdbc.lineage.JdbcLocationExtractorFactory | 16 +++
pom.xml | 13 ++
57 files changed, 1479 insertions(+), 15 deletions(-)
diff --git a/.gitignore b/.gitignore
index 5f0068cd..fd822cb7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@ scalastyle-output.xml
.metadata
.settings
.project
+.java-version
.version.properties
filter.properties
logs.zip
diff --git
a/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0
b/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0
new file mode 100644
index 00000000..3f29182c
--- /dev/null
+++
b/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0
@@ -0,0 +1 @@
+Method
<org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()>
calls method
<org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()>
in (JdbcSource.java:215)
diff --git
a/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a
b/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a
new file mode 100644
index 00000000..c248851c
--- /dev/null
+++
b/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a
@@ -0,0 +1,4 @@
+org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String,
java.util.Properties): Returned leaf type
org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside
outside of package 'org.apache.flink..' or reside in any package ['..shaded..']
or annotated with @Public or annotated with @PublicEvolving or annotated with
@Deprecated
+org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned
leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not
satisfy: reside outside of package 'org.apache.flink..' or reside in any
package ['..shaded..'] or annotated with @Public or annotated with
@PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String,
java.util.Properties): Returned leaf type
org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside
outside of package 'org.apache.flink..' or reside in any package ['..shaded..']
or annotated with @Public or annotated with @PublicEvolving or annotated with
@Deprecated
+org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String,
java.util.Properties): Returned leaf type
org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside
outside of package 'org.apache.flink..' or reside in any package ['..shaded..']
or annotated with @Public or annotated with @PublicEvolving or annotated with
@Deprecated
diff --git a/flink-connector-jdbc-core/pom.xml
b/flink-connector-jdbc-core/pom.xml
index 43d84449..c3142390 100644
--- a/flink-connector-jdbc-core/pom.xml
+++ b/flink-connector-jdbc-core/pom.xml
@@ -56,6 +56,16 @@ under the License.
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>io.openlineage</groupId>
+ <artifactId>openlineage-sql-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openlineage</groupId>
+ <artifactId>openlineage-java</artifactId>
+ </dependency>
+
<!-- Tests -->
<dependency>
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
index dacf3c6c..fb8b11fa 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
@@ -24,6 +24,7 @@ import
org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
@@ -31,10 +32,15 @@ import
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
import
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
import
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -53,6 +59,8 @@ import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
/**
* InputFormat to read data from a database and generate Rows. The InputFormat
has to be configured
@@ -107,7 +115,7 @@ import java.util.Arrays;
@Deprecated
@Experimental
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
- implements ResultTypeQueryable<Row> {
+ implements LineageVertexProvider, ResultTypeQueryable<Row> {
protected static final long serialVersionUID = 2L;
protected static final Logger LOG =
LoggerFactory.getLogger(JdbcInputFormat.class);
@@ -344,6 +352,19 @@ public class JdbcInputFormat extends RichInputFormat<Row,
InputSplit>
return new JdbcInputFormatBuilder();
}
+ @Override
+ public LineageVertex getLineageVertex() {
+ DefaultTypeDatasetFacet defaultTypeDatasetFacet =
+ new DefaultTypeDatasetFacet(getProducedType());
+ Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate,
true);
+ String namespace = LineageUtils.namespaceOf(connectionProvider);
+ LineageDataset dataset =
+ LineageUtils.datasetOf(
+ nameOpt.orElse(""), namespace,
Arrays.asList(defaultTypeDatasetFacet));
+ return LineageUtils.sourceLineageVertexOf(
+ Boundedness.BOUNDED, Collections.singleton(dataset));
+ }
+
/** Builder for {@link JdbcInputFormat}. */
public static class JdbcInputFormatBuilder {
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder
connOptionsBuilder;
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
index 5753eb9d..5ccb6e49 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
@@ -37,11 +37,16 @@ import
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterSta
import
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
/**
* Flink Sink to produce data into a jdbc database.
@@ -50,7 +55,8 @@ import java.util.Collections;
*/
@PublicEvolving
public class JdbcSink<IN>
- implements Sink<IN>,
+ implements LineageVertexProvider,
+ Sink<IN>,
SupportsWriterState<IN, JdbcWriterState>,
SupportsCommitter<JdbcCommitable> {
@@ -120,4 +126,13 @@ public class JdbcSink<IN>
public SimpleVersionedSerializer<JdbcWriterState>
getWriterStateSerializer() {
return new JdbcWriterStateSerializer();
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ Optional<String> nameOpt =
LineageUtils.tableNameOf(queryStatement.query(), false);
+ String namespace = LineageUtils.namespaceOf(connectionProvider);
+ LineageDataset dataset =
+ LineageUtils.datasetOf(nameOpt.orElse(""), namespace,
Collections.emptyList());
+ return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
+ }
}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
index 8efb732e..c861d676 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
@@ -35,25 +35,35 @@ import
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSou
import
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
import
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
import
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
import
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
import
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
import
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
import
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
import
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
import
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Objects;
+import java.util.Optional;
/** JDBC source. */
@PublicEvolving
public class JdbcSource<OUT>
- implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
+ implements LineageVertexProvider,
+ Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
ResultTypeQueryable<OUT> {
private final Boundedness boundedness;
@@ -200,4 +210,18 @@ public class JdbcSource<OUT>
&& deliveryGuarantee == that.deliveryGuarantee
&& Objects.equals(continuousUnBoundingSettings,
that.continuousUnBoundingSettings);
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ DefaultTypeDatasetFacet defaultTypeDatasetFacet =
+ new DefaultTypeDatasetFacet(getTypeInformation());
+ SqlTemplateSplitEnumerator enumerator =
+ (SqlTemplateSplitEnumerator)
sqlSplitEnumeratorProvider.create();
+ Optional<String> nameOpt =
LineageUtils.tableNameOf(enumerator.getSqlTemplate(), true);
+ String namespace = LineageUtils.namespaceOf(connectionProvider);
+ LineageDataset dataset =
+ LineageUtils.datasetOf(
+ nameOpt.orElse(""), namespace,
Arrays.asList(defaultTypeDatasetFacet));
+ return LineageUtils.sourceLineageVertexOf(boundedness,
Collections.singleton(dataset));
+ }
}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
index b06f555d..de02666f 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
@@ -24,16 +24,22 @@ import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import
org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
import
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
@@ -51,11 +57,13 @@ import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
/** InputFormat for {@link JdbcDynamicTableSource}. */
@Internal
public class JdbcRowDataInputFormat extends RichInputFormat<RowData,
InputSplit>
- implements ResultTypeQueryable<RowData> {
+ implements LineageVertexProvider, ResultTypeQueryable<RowData> {
private static final long serialVersionUID = 2L;
private static final Logger LOG =
LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
@@ -296,6 +304,19 @@ public class JdbcRowDataInputFormat extends
RichInputFormat<RowData, InputSplit>
return new Builder();
}
+ @Override
+ public LineageVertex getLineageVertex() {
+ DefaultTypeDatasetFacet defaultTypeDatasetFacet =
+ new DefaultTypeDatasetFacet(getProducedType());
+ Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate,
true);
+ String namespace = LineageUtils.namespaceOf(connectionProvider);
+ LineageDataset dataset =
+ LineageUtils.datasetOf(
+ nameOpt.orElse(""), namespace,
Arrays.asList(defaultTypeDatasetFacet));
+ return LineageUtils.sourceLineageVertexOf(
+ Boundedness.BOUNDED, Collections.singleton(dataset));
+ }
+
/** Builder for {@link JdbcRowDataInputFormat}. */
public static class Builder {
private JdbcConnectionOptions.JdbcConnectionOptionsBuilder
connOptionsBuilder;
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
index dd39fa27..1466df45 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
@@ -20,18 +20,26 @@ package org.apache.flink.connector.jdbc.core.table.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
import
org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
import
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.table.types.utils.TypeConversions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +54,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -53,7 +62,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
/** A lookup function for {@link JdbcDynamicTableSource}. */
@Internal
-public class JdbcRowDataLookupFunction extends LookupFunction {
+public class JdbcRowDataLookupFunction extends LookupFunction implements
LineageVertexProvider {
private static final Logger LOG =
LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
private static final long serialVersionUID = 2L;
@@ -67,6 +76,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction
{
private final List<String> resolvedPredicates;
private final Serializable[] pushdownParams;
+ private final RowType producedType;
private transient FieldNamedPreparedStatement statement;
@@ -106,12 +116,12 @@ public class JdbcRowDataLookupFunction extends
LookupFunction {
.getSelectFromStatement(options.getTableName(),
fieldNames, keyNames);
JdbcDialect jdbcDialect = options.getDialect();
this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType);
- this.lookupKeyRowConverter =
- jdbcDialect.getRowConverter(
- RowType.of(
- Arrays.stream(keyTypes)
- .map(DataType::getLogicalType)
- .toArray(LogicalType[]::new)));
+ this.producedType =
+ RowType.of(
+ Arrays.stream(keyTypes)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new));
+ this.lookupKeyRowConverter = jdbcDialect.getRowConverter(producedType);
this.resolvedPredicates = resolvedPredicates;
this.pushdownParams = pushdownParams;
}
@@ -224,4 +234,19 @@ public class JdbcRowDataLookupFunction extends
LookupFunction {
public Connection getDbConnection() {
return connectionProvider.getConnection();
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ DefaultTypeDatasetFacet defaultTypeDatasetFacet =
+ new DefaultTypeDatasetFacet(
+ LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+
TypeConversions.fromLogicalToDataType(producedType)));
+ Optional<String> nameOpt = LineageUtils.tableNameOf(query, true);
+ String namespace = LineageUtils.namespaceOf(connectionProvider);
+ LineageDataset dataset =
+ LineageUtils.datasetOf(
+ nameOpt.orElse(""), namespace,
Arrays.asList(defaultTypeDatasetFacet));
+ return LineageUtils.sourceLineageVertexOf(
+ Boundedness.BOUNDED, Collections.singleton(dataset));
+ }
}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
index 4c48f799..3c0c00e4 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
@@ -149,4 +149,8 @@ public class SimpleJdbcConnectionProvider implements
JdbcConnectionProvider, Ser
closeConnection();
return getOrEstablishConnection();
}
+
+ public String getDbURL() {
+ return this.jdbcOptions.getDbURL();
+ }
}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
index c031419d..d01afee8 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
@@ -27,6 +27,8 @@ import
org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
@@ -36,7 +38,7 @@ import java.io.IOException;
/** A generic SinkFunction for JDBC. */
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
- implements CheckpointedFunction, InputTypeConfigurable {
+ implements LineageVertexProvider, CheckpointedFunction,
InputTypeConfigurable {
private final JdbcOutputFormat<T, ?, ?> outputFormat;
private JdbcOutputSerializer<T> serializer;
@@ -78,4 +80,9 @@ public class GenericJdbcSinkFunction<T> extends
RichSinkFunction<T>
((TypeInformation<T>) type)
.createSerializer(executionConfig.getSerializerConfig()));
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ return outputFormat.getLineageVertex();
+ }
}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
index 7668236e..e02657bf 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
@@ -23,6 +23,10 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.SerializableSupplier;
@@ -36,6 +40,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -47,7 +53,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
/** A JDBC outputFormat that supports batching records before writing records
to database. */
@Internal
public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends
JdbcBatchStatementExecutor<JdbcIn>>
- implements Flushable, AutoCloseable, Serializable {
+ implements LineageVertexProvider, Flushable, AutoCloseable,
Serializable {
protected final JdbcConnectionProvider connectionProvider;
@@ -247,4 +253,16 @@ public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends
JdbcBatchStatementExe
public Connection getConnection() {
return connectionProvider.getConnection();
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ Optional<String> nameOpt =
+ jdbcStatementExecutor == null
+ ? Optional.empty()
+ :
LineageUtils.tableNameOf(jdbcStatementExecutor.insertSql(), false);
+ String namespace = LineageUtils.namespaceOf(connectionProvider);
+ LineageDataset dataset =
+ LineageUtils.datasetOf(nameOpt.orElse(""), namespace,
Collections.emptyList());
+ return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
+ }
}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java
index 838cbc52..bcf51961 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java
@@ -108,6 +108,11 @@ public final class InsertOrUpdateJdbcExecutor<R, K, V>
implements JdbcBatchState
}
}
+ @Override
+ public String insertSql() {
+ return insertSQL;
+ }
+
private void processOneRowInBatch(K pk, V row) throws SQLException {
if (exist(pk)) {
updateSetter.accept(updateStatement, row);
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java
index 3e52e87f..577f0422 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java
@@ -40,6 +40,9 @@ public interface JdbcBatchStatementExecutor<T> {
/** Close JDBC related statements. */
void closeStatements() throws SQLException;
+ /** return the insert sql of the executor. */
+ String insertSql();
+
static <T, K> JdbcBatchStatementExecutor<T> keyed(
String sql, Function<T, K> keyExtractor, JdbcStatementBuilder<K>
statementBuilder) {
return new KeyedBatchStatementExecutor<>(sql, keyExtractor,
statementBuilder);
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java
index c16f275f..e14eedd0 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java
@@ -79,6 +79,11 @@ class KeyedBatchStatementExecutor<T, K> implements
JdbcBatchStatementExecutor<T>
}
}
+ @Override
+ public String insertSql() {
+ return sql;
+ }
+
@Override
public void closeStatements() throws SQLException {
if (st != null) {
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java
index 253fdb42..76102a81 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java
@@ -71,6 +71,11 @@ class SimpleBatchStatementExecutor<T> implements
JdbcBatchStatementExecutor<T> {
}
}
+ @Override
+ public String insertSql() {
+ return sql;
+ }
+
@Override
public void closeStatements() throws SQLException {
if (st != null) {
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
index fd062ccc..ac013d85 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
@@ -84,6 +84,11 @@ public final class TableBufferReducedStatementExecutor
}
}
+ @Override
+ public String insertSql() {
+ return upsertExecutor.insertSql();
+ }
+
@Override
public void executeBatch() throws SQLException {
if (!reduceBuffer.isEmpty()) {
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
index 7e5e50ec..17e2f5df 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
@@ -61,6 +61,11 @@ public final class TableBufferedStatementExecutor implements
JdbcBatchStatementE
}
}
+ @Override
+ public String insertSql() {
+ return statementExecutor.insertSql();
+ }
+
@Override
public void closeStatements() throws SQLException {
statementExecutor.closeStatements();
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
index 61ef4cc4..7df38535 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
@@ -82,6 +82,11 @@ public final class TableInsertOrUpdateStatementExecutor
processOneRowInBatch(keyExtractor.apply(record), record);
}
+ @Override
+ public String insertSql() {
+ return insertStatement.getQuery();
+ }
+
private void processOneRowInBatch(RowData pk, RowData row) throws
SQLException {
if (exist(pk)) {
updateSetter.toExternal(row, updateStatement);
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
index a3fc7b7e..88a1ad50 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
@@ -60,6 +60,11 @@ public final class TableSimpleStatementExecutor implements
JdbcBatchStatementExe
st.addBatch();
}
+ @Override
+ public String insertSql() {
+ return st.getQuery();
+ }
+
@Override
public void executeBatch() throws SQLException {
st.executeBatch();
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java
new file mode 100644
index 00000000..989340fd
--- /dev/null
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Objects;
+
+/** Default implementation of {@link TypeDatasetFacet}. */
+@PublicEvolving
+public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
+
+ public static final String TYPE_FACET_NAME = "type";
+
+ private final TypeInformation typeInformation;
+
+ public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
+ this.typeInformation = typeInformation;
+ }
+
+ public TypeInformation getTypeInformation() {
+ return typeInformation;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
+ return Objects.equals(typeInformation, that.typeInformation);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(typeInformation);
+ }
+
+ @Override
+ public String name() {
+ return TYPE_FACET_NAME;
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java
new file mode 100644
index 00000000..cf224da8
--- /dev/null
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+
+/** JDBC connection URL location. */
+@Internal
+public class JdbcLocation {
+ private final String scheme;
+ private final Optional<String> authority;
+ private final Optional<String> instance;
+ private final Optional<String> database;
+
+ private JdbcLocation(
+ String scheme,
+ Optional<String> authority,
+ Optional<String> instance,
+ Optional<String> database) {
+ this.scheme = scheme;
+ this.authority = authority;
+ this.instance = instance;
+ this.database = database;
+ }
+
+ public String toNamespace() {
+ String result = scheme.toLowerCase(Locale.ROOT) + ":";
+ if (authority.isPresent()) {
+ result = String.format("%s//%s", result,
authority.get().toLowerCase(Locale.ROOT));
+ }
+ if (instance.isPresent()) {
+ result = String.format("%s/%s", result,
StringUtils.stripStart(instance.get(), "/"));
+ }
+ return result;
+ }
+
+ public String toName(List<String> parts) {
+ if (database.isPresent()) {
+ parts.add(0, database.get());
+ }
+ return String.join(".", parts);
+ }
+
+ public String getScheme() {
+ return this.scheme;
+ }
+
+ public Optional<String> getAuthority() {
+ return this.authority;
+ }
+
+ public Optional<String> getInstance() {
+ return this.instance;
+ }
+
+ public Optional<String> getDatabase() {
+ return this.database;
+ }
+
+ public static JdbcLocation.Builder builder() {
+ return new JdbcLocation.Builder();
+ }
+
+ /** Builder for {@link JdbcLocation}. */
+ @PublicEvolving
+ public static final class Builder {
+ private String scheme = "";
+ private Optional<String> authority = Optional.empty();
+ private Optional<String> instance = Optional.empty();
+ private Optional<String> database = Optional.empty();
+
+ public Builder withScheme(String scheme) {
+ this.scheme = scheme;
+ return this;
+ }
+
+ public Builder withAuthority(Optional<String> authority) {
+ this.authority = authority;
+ return this;
+ }
+
+ public Builder withInstance(Optional<String> instance) {
+ this.instance = instance;
+ return this;
+ }
+
+ public Builder withDatabase(Optional<String> database) {
+ this.database = database;
+ return this;
+ }
+
+ public JdbcLocation build() {
+ return new JdbcLocation(scheme, authority, instance, database);
+ }
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java
new file mode 100644
index 00000000..ec5d9e8e
--- /dev/null
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+
+/**
+ * A factory to create a specific {@link JdbcExtractor}. This factory is used
with Java's Service
+ * Provider Interfaces (SPI) for discovering.
+ *
+ * <p>Classes that implement this interface can be added to the
+ *
"META_INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory"
file of
+ * a JAR file in the current classpath to be found.
+ *
+ * @see JdbcExtractor
+ */
+@PublicEvolving
+public interface JdbcLocationExtractorFactory {
+
+ JdbcExtractor createExtractor();
+}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java
new file mode 100644
index 00000000..71775d8e
--- /dev/null
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+/** Utils for JDBC url preprocess and namespace extraction. */
+public class JdbcUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);
+ private static final String SLASH_DELIMITER_USER_PASSWORD_REGEX =
+ "[A-Za-z0-9_%]+//?[A-Za-z0-9_%]*@";
+ private static final String COLON_DELIMITER_USER_PASSWORD_REGEX =
+ "([/|,])[A-Za-z0-9_%]+:?[A-Za-z0-9_%]*@";
+ private static final String PARAMS_USER_PASSWORD_REGEX =
+ "(?i)[,;&:]?(?:user|username|password)=[^,;&:()]+[,;&:]?";
+ private static final String DUPLICATED_DELIMITERS =
"(\\(\\)){2,}|[,;&:]{2,}";
+ private static final String QUERY_PARAMS_REGEX = "\\?.*$";
+
+ private static final List<JdbcExtractor> extractors = new ArrayList<>();
+
+ static {
+ for (JdbcLocationExtractorFactory factory :
+ ServiceLoader.load(JdbcLocationExtractorFactory.class)) {
+ extractors.add(factory.createExtractor());
+ }
+ }
+
+ /**
+ * Get JDBC namespace from JdbcUrl.
+ *
+ * @param jdbcUrl JDBC URL
+ * @param properties connection properties
+ * @return namespace String
+ */
+ public static String getJdbcNamespace(String jdbcUrl, Properties
properties) {
+ String uri = jdbcUrl.replaceAll("^(?i)jdbc:", "");
+ try {
+ JdbcExtractor extractor = getExtractor(uri);
+ return extractor.extract(uri, properties).toNamespace();
+ } catch (URISyntaxException e) {
+ LOG.debug("Failed to parse jdbc url", e);
+ return dropSensitiveData(uri);
+ }
+ }
+
+ /**
+ * Get the corresponding JdbcExtractor of the jdbc Url.
+ *
+ * @param jdbcUrl JDBC URL
+ * @return JdbcExtractor
+ * @throws URISyntaxException
+ */
+ public static JdbcExtractor getExtractor(String jdbcUrl) throws
URISyntaxException {
+ for (JdbcExtractor extractor : extractors) {
+ if (extractor.isDefinedAt(jdbcUrl)) {
+ return extractor;
+ }
+ }
+
+ throw new URISyntaxException(jdbcUrl, "Unsupported JDBC URL");
+ }
+
+ /**
+ * JdbcUrl can contain username and password this method clean-up
credentials from jdbcUrl. Also
+ * drop query params as they include a lot of useless options, like timeout
+ *
+ * @param jdbcUrl url to database
+ * @return String
+ */
+ private static String dropSensitiveData(String jdbcUrl) {
+ return jdbcUrl.replaceAll(SLASH_DELIMITER_USER_PASSWORD_REGEX, "@")
+ .replaceAll(COLON_DELIMITER_USER_PASSWORD_REGEX, "$1")
+ .replaceAll(PARAMS_USER_PASSWORD_REGEX, "")
+ .replaceAll(DUPLICATED_DELIMITERS, "")
+ .replaceAll(QUERY_PARAMS_REGEX, "");
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java
new file mode 100644
index 00000000..0a09671c
--- /dev/null
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import
org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import io.openlineage.sql.DbTableMeta;
+import io.openlineage.sql.OpenLineageSql;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Utils for Lineage metadata extraction. */
+@PublicEvolving
+public class LineageUtils {
+
+ public static Optional<String> nameOf(
+ JdbcQueryStatement<?> jdbcQueryStatement, boolean isSource) {
+ if (!(jdbcQueryStatement instanceof SimpleJdbcQueryStatement)) {
+ return Optional.empty();
+ }
+
+ SimpleJdbcQueryStatement<?> simpleJdbcQueryStatement =
+ (SimpleJdbcQueryStatement<?>) jdbcQueryStatement;
+ return tableNameOf(simpleJdbcQueryStatement.query(), isSource);
+ }
+
+ public static Optional<String> tableNameOf(String query, boolean isSource)
{
+ return OpenLineageSql.parse(Arrays.asList(query))
+ .map(
+ sqlMeta ->
+ isSource
+ ?
getFirstQualifiedName(sqlMeta.inTables())
+ :
getFirstQualifiedName(sqlMeta.outTables()));
+ }
+
+ public static String namespaceOf(JdbcConnectionProvider
jdbcConnectionProvider) {
+ if (!(jdbcConnectionProvider instanceof SimpleJdbcConnectionProvider))
{
+ return "";
+ }
+
+ SimpleJdbcConnectionProvider simpleJdbcConnectionProvider =
+ (SimpleJdbcConnectionProvider) jdbcConnectionProvider;
+
+ return JdbcUtils.getJdbcNamespace(
+ simpleJdbcConnectionProvider.getDbURL(),
+ simpleJdbcConnectionProvider.getProperties());
+ }
+
+ public static LineageDataset datasetOf(
+ String name, String namespace, List<LineageDatasetFacet> facets) {
+
+ return new LineageDataset() {
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String namespace() {
+ return namespace;
+ }
+
+ @Override
+ public Map<String, LineageDatasetFacet> facets() {
+ Map<String, LineageDatasetFacet> facetMap = new HashMap<>();
+ facetMap.putAll(
+ facets.stream()
+ .collect(
+
Collectors.toMap(LineageDatasetFacet::name, item -> item)));
+ return facetMap;
+ }
+ };
+ }
+
+ public static LineageVertex lineageVertexOf(Collection<LineageDataset>
datasets) {
+ return new LineageVertex() {
+ @Override
+ public List<LineageDataset> datasets() {
+ return new ArrayList<>(datasets);
+ }
+ };
+ }
+
+ public static SourceLineageVertex sourceLineageVertexOf(
+ Boundedness boundedness, Collection<LineageDataset> datasets) {
+ return new SourceLineageVertex() {
+ @Override
+ public Boundedness boundedness() {
+ return boundedness;
+ }
+
+ @Override
+ public List<LineageDataset> datasets() {
+ return new ArrayList<>(datasets);
+ }
+ };
+ }
+
+ private static String getFirstQualifiedName(List<DbTableMeta> tableMetas) {
+ return tableMetas.isEmpty() ? "" : tableMetas.get(0).qualifiedName();
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java
new file mode 100644
index 00000000..4f343f39
--- /dev/null
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+/** Facet definition to contain type information of source and sink. */
+@PublicEvolving
+public interface TypeDatasetFacet extends LineageDatasetFacet {
+
+ TypeInformation<?> getTypeInformation();
+}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
index 85814ece..d8229783 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
@@ -83,6 +83,13 @@ public interface FieldNamedPreparedStatement extends
AutoCloseable {
connection, sql, fieldNames, additionalPredicates,
numberOfDynamicParams);
}
+ /**
+ * Returns the final prepared query.
+ *
+ * @return prepared query
+ */
+ String getQuery();
+
/**
* Clears the current parameter values immediately.
*
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
index fc05b90b..ea2bac1e 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
@@ -39,10 +39,18 @@ public class FieldNamedPreparedStatementImpl implements
FieldNamedPreparedStatem
private final PreparedStatement statement;
private final int[][] indexMapping;
+ private final String query;
- private FieldNamedPreparedStatementImpl(PreparedStatement statement,
int[][] indexMapping) {
+ private FieldNamedPreparedStatementImpl(
+ PreparedStatement statement, int[][] indexMapping, String query) {
this.statement = statement;
this.indexMapping = indexMapping;
+ this.query = query;
+ }
+
+ @Override
+ public String getQuery() {
+ return query;
}
@Override
@@ -221,7 +229,7 @@ public class FieldNamedPreparedStatementImpl implements
FieldNamedPreparedStatem
}
return new FieldNamedPreparedStatementImpl(
- connection.prepareStatement(parsedSQL), indexMapping);
+ connection.prepareStatement(parsedSQL), indexMapping,
parsedSQL);
}
/**
diff --git a/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE
b/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE
new file mode 100644
index 00000000..0be718a0
--- /dev/null
+++ b/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,11 @@
+flink-connector-jdbc-core
+
+Copyright 2014-2024 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+- io.openlineage:openlineage-sql-java:1.32.0
+- io.openlineage:openlineage-java:1.32.0
+
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
index f72434db..61b897e5 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
@@ -18,10 +18,14 @@
package org.apache.flink.connector.jdbc;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
import
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.AfterEach;
@@ -380,6 +384,34 @@ class JdbcInputFormatTest extends JdbcDataTestBase {
jdbcInputFormat.closeInputFormat();
}
+ @Test
+ void testGetLineageVertex() {
+ jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(getMetadata().getDriverClass())
+ .setDBUrl(getMetadata().getJdbcUrl())
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+ .finish();
+
+ SourceLineageVertex lineageVertex =
+ (SourceLineageVertex) jdbcInputFormat.getLineageVertex();
+
+ assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+ assertThat(lineageVertex.boundedness()).isEqualTo(Boundedness.BOUNDED);
+ LineageDataset lineageDataset = lineageVertex.datasets().get(0);
+ assertThat(lineageDataset.name()).isEqualTo("books");
+ assertThat(lineageDataset.namespace()).isEqualTo("derby:memory:test");
+ assertThat(lineageDataset.facets().size()).isEqualTo(1);
+ assertThat(lineageDataset.facets().size()).isEqualTo(1);
+ assertThat(
+ ((DefaultTypeDatasetFacet)
lineageDataset.facets().values().toArray()[0])
+ .getTypeInformation()
+ .getArity())
+ .isEqualTo(5);
+ }
+
private void verifySplit(InputSplit split, int expectedIDSum) throws
IOException {
int sum = 0;
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
index 4d8a20b6..305837e7 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.AfterEach;
@@ -282,6 +283,24 @@ class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
}
+ @Test
+ void testGetLineageVertex() throws Exception {
+ jdbcOutputFormat =
+ JdbcRowOutputFormat.buildJdbcOutputFormat()
+ .setDrivername(getMetadata().getDriverClass())
+ .setDBUrl(getMetadata().getJdbcUrl())
+ .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
+ .finish();
+ JdbcOutputSerializer<Row> serializer =
+
JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
+ jdbcOutputFormat.open(serializer);
+
+ LineageVertex lineageVertex = jdbcOutputFormat.getLineageVertex();
+ assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+
assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("newbooks");
+
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test");
+ }
+
@Test
void testFlush() throws SQLException, IOException {
jdbcOutputFormat =
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
index 48fe94f1..d8ab0d71 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.junit.jupiter.api.Test;
@@ -108,6 +109,19 @@ public abstract class BaseJdbcSinkTest implements
DerbyTestBase {
assertResult(BOOKS);
}
+ @Test
+ public void testGetLineageVertex() {
+ JdbcSink<?> sink =
+ finishSink(
+ new JdbcSinkBuilder<BooksTable.BookEntry>()
+ .withQueryStatement(
+ TEST_TABLE.getInsertIntoQuery(),
+ TEST_TABLE.getStatementBuilder()));
+
+ LineageVertex lineageVertex = sink.getLineageVertex();
+ assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+ }
+
private void assertResult(List<BooksTable.BookEntry> expected) throws
SQLException {
assertThat(TEST_TABLE.selectAllTable(getMetadata().getConnection())).isEqualTo(expected);
}
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
index a22e6859..0f20a9c8 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
@@ -20,11 +20,13 @@ package
org.apache.flink.connector.jdbc.core.datastream.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.jdbc.JdbcDataTestBase;
import
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -135,6 +137,27 @@ class JdbcSourceITCase extends JdbcDataTestBase implements
JdbcITCaseBase {
assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA);
}
+ @Test
+ void testGetLineageVertex() {
+ JdbcSource<TestEntry> jdbcSource =
+ JdbcSource.<TestEntry>builder()
+
.setTypeInformation(TypeInformation.of(TestEntry.class))
+ .setSql(sql + " where id >= ? and id <= ?")
+ .setJdbcParameterValuesProvider(
+ new JdbcGenericParameterValuesProvider(
+ new Serializable[][] {{1001, 1005},
{1006, 1010}}))
+ .setDBUrl(getMetadata().getJdbcUrl())
+ .setDriverName(getMetadata().getDriverClass())
+ .setResultExtractor(extractor)
+ .build();
+
+ SourceLineageVertex lineageVertex = (SourceLineageVertex)
jdbcSource.getLineageVertex();
+ assertThat(lineageVertex.boundedness()).isEqualTo(Boundedness.BOUNDED);
+ assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+ assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("books");
+
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test");
+ }
+
/** A sink function to collect the records. */
static class TestingSinkFunction implements SinkFunction<TestEntry> {
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
index 32618d92..af549ec6 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
@@ -25,6 +25,7 @@ import
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
@@ -371,6 +372,39 @@ class JdbcOutputFormatTest extends JdbcDataTestBase {
}
}
+ @Test
+ void testGetLineageVertex() throws Exception {
+ InternalJdbcConnectionOptions jdbcOptions =
+ InternalJdbcConnectionOptions.builder()
+ .setDriverName(getMetadata().getDriverClass())
+ .setDBUrl(getMetadata().getJdbcUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+ JdbcDmlOptions dmlOptions =
+ JdbcDmlOptions.builder()
+ .withTableName(jdbcOptions.getTableName())
+ .withDialect(jdbcOptions.getDialect())
+ .withFieldNames(fieldNames)
+ .build();
+
+ outputFormat =
+ new JdbcOutputFormatBuilder()
+ .setJdbcOptions(jdbcOptions)
+ .setFieldDataTypes(fieldDataTypes)
+ .setJdbcDmlOptions(dmlOptions)
+
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
+ .build();
+
+ JdbcOutputSerializer<RowData> serializer =
+
JdbcOutputSerializer.of(getSerializer(TypeInformation.of(RowData.class), true));
+ outputFormat.open(serializer);
+
+ LineageVertex lineageVertex = outputFormat.getLineageVertex();
+ assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+
assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("newbooks");
+
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test");
+ }
+
@Test
void testFlush() throws SQLException, IOException {
InternalJdbcConnectionOptions jdbcOptions =
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index ba986cf5..171536d0 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -25,6 +25,7 @@ import
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnecti
import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.AfterEach;
@@ -121,6 +122,11 @@ public class JdbcTableOutputFormatTest extends
JdbcDataTestBase {
}
}
+ @Override
+ public String insertSql() {
+ return "";
+ }
+
@Override
public void prepareStatements(Connection
connection) {}
@@ -139,6 +145,11 @@ public class JdbcTableOutputFormatTest extends
JdbcDataTestBase {
}
}
+ @Override
+ public String insertSql() {
+ return "";
+ }
+
@Override
public void addToBatch(Row record) {}
@@ -213,6 +224,36 @@ public class JdbcTableOutputFormatTest extends
JdbcDataTestBase {
check(expected);
}
+ @Test
+ void testGetLineageVertex() throws Exception {
+ InternalJdbcConnectionOptions options =
+ InternalJdbcConnectionOptions.builder()
+ .setDBUrl(getMetadata().getJdbcUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+ JdbcDmlOptions dmlOptions =
+ JdbcDmlOptions.builder()
+ .withTableName(options.getTableName())
+ .withDialect(options.getDialect())
+ .withFieldNames(fieldNames)
+ .withKeyFields(keyFields)
+ .build();
+ format =
+ new TableJdbcUpsertOutputFormat(
+ new SimpleJdbcConnectionProvider(options),
+ dmlOptions,
+ JdbcExecutionOptions.defaults());
+
+ JdbcOutputSerializer<Row> serializer =
+
JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
+ format.open(serializer);
+
+ LineageVertex lineageVertex = format.getLineageVertex();
+ assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+
assertThat(lineageVertex.datasets().get(0).name()).isEqualTo(OUTPUT_TABLE);
+
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test");
+ }
+
private void check(Row[] rows) throws SQLException {
check(rows, getMetadata().getJdbcUrl(), OUTPUT_TABLE, fieldNames);
}
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java
new file mode 100644
index 00000000..ad90e123
--- /dev/null
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link JdbcUtils}. */
+class JdbcUtilsTest {
+ private static final String TEST_JDBC_URL = "jdbc:test://localhost/testdb";
+ private static final String UNSUPPORTED_JDBC_URL =
"jdbc:unsupported://localhost:8990/testdb";
+
+ @Test
+ void testGetTestJdbcExtractor() throws Exception {
+ JdbcExtractor jdbcExtractor =
JdbcUtils.getExtractor("test://localhost/testdb");
+ assertThat(jdbcExtractor).isInstanceOf(TestJdbcExtractor.class);
+ }
+
+ @Test
+ void testGetUnsupportedJdbcExtractor() {
+ assertThatThrownBy(
+ () -> {
+
JdbcUtils.getExtractor("unsupported://localhost:8990/testdb");
+ })
+ .isInstanceOf(URISyntaxException.class)
+ .hasMessage("Unsupported JDBC URL:
unsupported://localhost:8990/testdb");
+ }
+
+ @Test
+ void testGetValidJdbcNamespace() {
+ String test = JdbcUtils.getJdbcNamespace(TEST_JDBC_URL, new
Properties());
+ assertThat(test).isEqualTo("test://localhost:10051");
+ }
+
+ @Test
+ void testUnsupportedJdbcNamespace() {
+ String test = JdbcUtils.getJdbcNamespace(UNSUPPORTED_JDBC_URL, new
Properties());
+ assertThat(test).isEqualTo("unsupported://localhost:8990/testdb");
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java
new file mode 100644
index 00000000..4597a7e4
--- /dev/null
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LineageUtils}. */
+class LineageUtilsTest {
+
+ @Test
+ void testGetNameFromJdbcQueryStatement() {
+ SimpleJdbcQueryStatement<String> simpleStatement =
+ new SimpleJdbcQueryStatement<String>("select * from
test_table", null);
+
+ Optional<String> tableNameOpt = LineageUtils.nameOf(simpleStatement,
true);
+ assertThat(tableNameOpt).isNotEmpty();
+ assertThat(tableNameOpt.get()).isEqualTo("test_table");
+ }
+
+ @Test
+ void testGetNamespaceFromJdbcConnectionProvider() {
+ JdbcConnectionOptions options =
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl("jdbc:test://localhost/test_db")
+ .build();
+ SimpleJdbcConnectionProvider provider = new
SimpleJdbcConnectionProvider(options);
+ String namespace = LineageUtils.namespaceOf(provider);
+ assertThat(namespace).isEqualTo("test://localhost:10051");
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java
new file mode 100644
index 00000000..84021db6
--- /dev/null
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java
@@ -0,0 +1,25 @@
+package org.apache.flink.connector.jdbc.lineage;
+
+import io.openlineage.client.utils.jdbc.GenericJdbcExtractor;
+import io.openlineage.client.utils.jdbc.JdbcLocation;
+import io.openlineage.client.utils.jdbc.OverridingJdbcExtractor;
+
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+/** Implementation of {@link io.openlineage.client.utils.jdbc.JdbcExtractor}
for test purpose. */
+public class TestJdbcExtractor extends GenericJdbcExtractor {
+ private OverridingJdbcExtractor delegate;
+
+ public TestJdbcExtractor() {
+ this.delegate = new OverridingJdbcExtractor("test", "10051");
+ }
+
+ public boolean isDefinedAt(String jdbcUri) {
+ return this.delegate.isDefinedAt(jdbcUri);
+ }
+
+ public JdbcLocation extract(String rawUri, Properties properties) throws
URISyntaxException {
+ return this.delegate.extract(rawUri, properties);
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java
new file mode 100644
index 00000000..70c2a6fe
--- /dev/null
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+
+/** JdbcLocationExtractorFactory for {@link TestJdbcExtractor}. */
+public class TestJdbcLocationExtractorFactory implements
JdbcLocationExtractorFactory {
+
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new TestJdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..00d34173
--- /dev/null
+++
b/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.lineage.TestJdbcLocationExtractorFactory
\ No newline at end of file
diff --git
a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java
new file mode 100644
index 00000000..ad17dfd3
--- /dev/null
+++
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.cratedb.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.CrateJdbcExtractor;
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for CrateDB. */
+@Internal
+public class CrateLocationExtractorFactory implements
JdbcLocationExtractorFactory {
+
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new CrateJdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..bcd4dfeb
--- /dev/null
+++
b/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.cratedb.database.lineage.CrateLocationExtractorFactory
\ No newline at end of file
diff --git
a/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java
b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java
new file mode 100644
index 00000000..d4a0de2d
--- /dev/null
+++
b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.db2.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.Db2JdbcExtractor;
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for DB2. */
+@Internal
+public class Db2LocationExtractorFactory implements
JdbcLocationExtractorFactory {
+
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new Db2JdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..9f066f72
--- /dev/null
+++
b/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.db2.database.lineage.Db2LocationExtractorFactory
\ No newline at end of file
diff --git
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java
new file mode 100644
index 00000000..d1c5e00e
--- /dev/null
+++
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.mysql.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.MySqlJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for Mysql. */
+@Internal
+public class MySqlLocationExtractorFactory implements
JdbcLocationExtractorFactory {
+
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new MySqlJdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..b681048a
--- /dev/null
+++
b/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.mysql.database.lineage.MySqlLocationExtractorFactory
\ No newline at end of file
diff --git
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java
new file mode 100644
index 00000000..f60e504a
--- /dev/null
+++
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.OceanBaseJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for OceanBase. */
+@Internal
+public class OceanBaseLocationExtractorFactory implements
JdbcLocationExtractorFactory {
+
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new OceanBaseJdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..678ddb65
--- /dev/null
+++
b/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.oceanbase.database.lineage.OceanBaseLocationExtractorFactory
\ No newline at end of file
diff --git
a/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java
b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java
new file mode 100644
index 00000000..7ace87a4
--- /dev/null
+++
b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oracle.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.OracleJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for Oracle. */
+@Internal
+public class OracleLocationExtractorFactory implements
JdbcLocationExtractorFactory {
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new OracleJdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..7428410d
--- /dev/null
+++
b/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.oracle.database.lineage.OracleLocationExtractorFactory
\ No newline at end of file
diff --git
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java
new file mode 100644
index 00000000..4ecb74be
--- /dev/null
+++
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.postgres.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.PostgresJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for Postgres. */
+@Internal
+public class PostgresLocationExtractorFactory implements
JdbcLocationExtractorFactory {
+
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new PostgresJdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..0ea5acd7
--- /dev/null
+++
b/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.postgres.database.lineage.PostgresLocationExtractorFactory
\ No newline at end of file
diff --git
a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java
b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java
new file mode 100644
index 00000000..94fff01a
--- /dev/null
+++
b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sqlserver.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.SqlServerJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for SqlServer. */
+@Internal
+public class SqlServerLocationExtractorFactory implements
JdbcLocationExtractorFactory {
+
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new SqlServerJdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..2598cebb
--- /dev/null
+++
b/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.sqlserver.database.lineage.SqlServerLocationExtractorFactory
\ No newline at end of file
diff --git
a/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java
b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java
new file mode 100644
index 00000000..f239f978
--- /dev/null
+++
b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.trino.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.TrinoJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for Trino. */
+@Internal
+public class TrinoLocationExtractorFactory implements
JdbcLocationExtractorFactory {
+
+ @Override
+ public JdbcExtractor createExtractor() {
+ return new TrinoJdbcExtractor();
+ }
+}
diff --git
a/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
b/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..8be9bbb7
--- /dev/null
+++
b/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.trino.database.lineage.TrinoLocationExtractorFactory
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index fe2fc778..a21db8ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@ under the License.
<assertj.version>3.24.2</assertj.version>
<testcontainers.version>1.20.1</testcontainers.version>
<mockito.version>3.12.4</mockito.version>
+ <openlineage.version>1.32.0</openlineage.version>
<japicmp.referenceVersion>3.0.0-1.16</japicmp.referenceVersion>
@@ -253,6 +254,18 @@ under the License.
<version>${log4j.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.openlineage</groupId>
+ <artifactId>openlineage-sql-java</artifactId>
+ <version>${openlineage.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openlineage</groupId>
+ <artifactId>openlineage-java</artifactId>
+ <version>${openlineage.version}</version>
+ </dependency>
+
<!-- For dependency convergence -->
<dependency>
<groupId>com.fasterxml.jackson</groupId>