This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new a2c08c5e [FLINK-34467] add lineage integration for jdbc connector 
(#149)
a2c08c5e is described below

commit a2c08c5e3949c5115d166df239d2486def990a4b
Author: Peter Huang <[email protected]>
AuthorDate: Mon Nov 17 23:07:49 2025 -0800

    [FLINK-34467] add lineage integration for jdbc connector (#149)
---
 .gitignore                                         |   1 +
 .../4bca2274-65a9-4a61-81ef-767d58233ea0           |   1 +
 .../d45c3af5-52c6-45fd-9926-75e75e77473a           |   4 +
 flink-connector-jdbc-core/pom.xml                  |  10 ++
 .../flink/connector/jdbc/JdbcInputFormat.java      |  23 +++-
 .../jdbc/core/datastream/sink/JdbcSink.java        |  17 ++-
 .../jdbc/core/datastream/source/JdbcSource.java    |  26 +++-
 .../core/table/source/JdbcRowDataInputFormat.java  |  23 +++-
 .../table/source/JdbcRowDataLookupFunction.java    |  39 ++++--
 .../connections/SimpleJdbcConnectionProvider.java  |   4 +
 .../jdbc/internal/GenericJdbcSinkFunction.java     |   9 +-
 .../connector/jdbc/internal/JdbcOutputFormat.java  |  20 +++-
 .../executor/InsertOrUpdateJdbcExecutor.java       |   5 +
 .../executor/JdbcBatchStatementExecutor.java       |   3 +
 .../executor/KeyedBatchStatementExecutor.java      |   5 +
 .../executor/SimpleBatchStatementExecutor.java     |   5 +
 .../TableBufferReducedStatementExecutor.java       |   5 +
 .../executor/TableBufferedStatementExecutor.java   |   5 +
 .../TableInsertOrUpdateStatementExecutor.java      |   5 +
 .../executor/TableSimpleStatementExecutor.java     |   5 +
 .../jdbc/lineage/DefaultTypeDatasetFacet.java      |  61 ++++++++++
 .../flink/connector/jdbc/lineage/JdbcLocation.java | 118 ++++++++++++++++++
 .../jdbc/lineage/JdbcLocationExtractorFactory.java |  38 ++++++
 .../flink/connector/jdbc/lineage/JdbcUtils.java    | 100 ++++++++++++++++
 .../flink/connector/jdbc/lineage/LineageUtils.java | 133 +++++++++++++++++++++
 .../connector/jdbc/lineage/TypeDatasetFacet.java   |  29 +++++
 .../statement/FieldNamedPreparedStatement.java     |   7 ++
 .../statement/FieldNamedPreparedStatementImpl.java |  12 +-
 .../src/main/resources/META-INF/NOTICE             |  11 ++
 .../flink/connector/jdbc/JdbcInputFormatTest.java  |  32 +++++
 .../connector/jdbc/JdbcRowOutputFormatTest.java    |  19 +++
 .../core/datastream/sink/BaseJdbcSinkTest.java     |  14 +++
 .../core/datastream/source/JdbcSourceITCase.java   |  23 ++++
 .../jdbc/core/table/sink/JdbcOutputFormatTest.java |  34 ++++++
 .../jdbc/internal/JdbcTableOutputFormatTest.java   |  41 +++++++
 .../connector/jdbc/lineage/JdbcUtilsTest.java      |  62 ++++++++++
 .../connector/jdbc/lineage/LineageUtilsTest.java   |  54 +++++++++
 .../connector/jdbc/lineage/TestJdbcExtractor.java  |  25 ++++
 .../lineage/TestJdbcLocationExtractorFactory.java  |  30 +++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 .../lineage/CrateLocationExtractorFactory.java     |  35 ++++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 .../lineage/Db2LocationExtractorFactory.java       |  35 ++++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 .../lineage/MySqlLocationExtractorFactory.java     |  35 ++++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 .../lineage/OceanBaseLocationExtractorFactory.java |  35 ++++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 .../lineage/OracleLocationExtractorFactory.java    |  34 ++++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 .../lineage/PostgresLocationExtractorFactory.java  |  35 ++++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 .../lineage/SqlServerLocationExtractorFactory.java |  35 ++++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 .../lineage/TrinoLocationExtractorFactory.java     |  35 ++++++
 ...ector.jdbc.lineage.JdbcLocationExtractorFactory |  16 +++
 pom.xml                                            |  13 ++
 57 files changed, 1479 insertions(+), 15 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5f0068cd..fd822cb7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@ scalastyle-output.xml
 .metadata
 .settings
 .project
+.java-version
 .version.properties
 filter.properties
 logs.zip
diff --git 
a/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0
 
b/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0
new file mode 100644
index 00000000..3f29182c
--- /dev/null
+++ 
b/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0
@@ -0,0 +1 @@
+Method 
<org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()>
 calls method 
<org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()>
 in (JdbcSource.java:215)
diff --git 
a/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a
 
b/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a
new file mode 100644
index 00000000..c248851c
--- /dev/null
+++ 
b/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a
@@ -0,0 +1,4 @@
+org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String,
 java.util.Properties): Returned leaf type 
org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside 
outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] 
or annotated with @Public or annotated with @PublicEvolving or annotated with 
@Deprecated
+org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned 
leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String,
 java.util.Properties): Returned leaf type 
org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside 
outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] 
or annotated with @Public or annotated with @PublicEvolving or annotated with 
@Deprecated
+org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String,
 java.util.Properties): Returned leaf type 
org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside 
outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] 
or annotated with @Public or annotated with @PublicEvolving or annotated with 
@Deprecated
diff --git a/flink-connector-jdbc-core/pom.xml 
b/flink-connector-jdbc-core/pom.xml
index 43d84449..c3142390 100644
--- a/flink-connector-jdbc-core/pom.xml
+++ b/flink-connector-jdbc-core/pom.xml
@@ -56,6 +56,16 @@ under the License.
             <optional>true</optional>
         </dependency>
 
+        <dependency>
+            <groupId>io.openlineage</groupId>
+            <artifactId>openlineage-sql-java</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openlineage</groupId>
+            <artifactId>openlineage-java</artifactId>
+        </dependency>
+
         <!-- Tests -->
 
         <dependency>
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
index dacf3c6c..fb8b11fa 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
@@ -31,10 +32,15 @@ import 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
@@ -53,6 +59,8 @@ import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
 
 /**
  * InputFormat to read data from a database and generate Rows. The InputFormat 
has to be configured
@@ -107,7 +115,7 @@ import java.util.Arrays;
 @Deprecated
 @Experimental
 public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
-        implements ResultTypeQueryable<Row> {
+        implements LineageVertexProvider, ResultTypeQueryable<Row> {
 
     protected static final long serialVersionUID = 2L;
     protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcInputFormat.class);
@@ -344,6 +352,19 @@ public class JdbcInputFormat extends RichInputFormat<Row, 
InputSplit>
         return new JdbcInputFormatBuilder();
     }
 
+    @Override
+    public LineageVertex getLineageVertex() {
+        DefaultTypeDatasetFacet defaultTypeDatasetFacet =
+                new DefaultTypeDatasetFacet(getProducedType());
+        Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate, 
true);
+        String namespace = LineageUtils.namespaceOf(connectionProvider);
+        LineageDataset dataset =
+                LineageUtils.datasetOf(
+                        nameOpt.orElse(""), namespace, 
Arrays.asList(defaultTypeDatasetFacet));
+        return LineageUtils.sourceLineageVertexOf(
+                Boundedness.BOUNDED, Collections.singleton(dataset));
+    }
+
     /** Builder for {@link JdbcInputFormat}. */
     public static class JdbcInputFormatBuilder {
         private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder 
connOptionsBuilder;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
index 5753eb9d..5ccb6e49 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
@@ -37,11 +37,16 @@ import 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterSta
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Optional;
 
 /**
  * Flink Sink to produce data into a jdbc database.
@@ -50,7 +55,8 @@ import java.util.Collections;
  */
 @PublicEvolving
 public class JdbcSink<IN>
-        implements Sink<IN>,
+        implements LineageVertexProvider,
+                Sink<IN>,
                 SupportsWriterState<IN, JdbcWriterState>,
                 SupportsCommitter<JdbcCommitable> {
 
@@ -120,4 +126,13 @@ public class JdbcSink<IN>
     public SimpleVersionedSerializer<JdbcWriterState> 
getWriterStateSerializer() {
         return new JdbcWriterStateSerializer();
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        Optional<String> nameOpt = 
LineageUtils.tableNameOf(queryStatement.query(), false);
+        String namespace = LineageUtils.namespaceOf(connectionProvider);
+        LineageDataset dataset =
+                LineageUtils.datasetOf(nameOpt.orElse(""), namespace, 
Collections.emptyList());
+        return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
index 8efb732e..c861d676 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
@@ -35,25 +35,35 @@ import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSou
 import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Objects;
+import java.util.Optional;
 
 /** JDBC source. */
 @PublicEvolving
 public class JdbcSource<OUT>
-        implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
+        implements LineageVertexProvider,
+                Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
                 ResultTypeQueryable<OUT> {
 
     private final Boundedness boundedness;
@@ -200,4 +210,18 @@ public class JdbcSource<OUT>
                 && deliveryGuarantee == that.deliveryGuarantee
                 && Objects.equals(continuousUnBoundingSettings, 
that.continuousUnBoundingSettings);
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        DefaultTypeDatasetFacet defaultTypeDatasetFacet =
+                new DefaultTypeDatasetFacet(getTypeInformation());
+        SqlTemplateSplitEnumerator enumerator =
+                (SqlTemplateSplitEnumerator) 
sqlSplitEnumeratorProvider.create();
+        Optional<String> nameOpt = 
LineageUtils.tableNameOf(enumerator.getSqlTemplate(), true);
+        String namespace = LineageUtils.namespaceOf(connectionProvider);
+        LineageDataset dataset =
+                LineageUtils.datasetOf(
+                        nameOpt.orElse(""), namespace, 
Arrays.asList(defaultTypeDatasetFacet));
+        return LineageUtils.sourceLineageVertexOf(boundedness, 
Collections.singleton(dataset));
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
index b06f555d..de02666f 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
@@ -24,16 +24,22 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 import 
org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 
@@ -51,11 +57,13 @@ import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
 
 /** InputFormat for {@link JdbcDynamicTableSource}. */
 @Internal
 public class JdbcRowDataInputFormat extends RichInputFormat<RowData, 
InputSplit>
-        implements ResultTypeQueryable<RowData> {
+        implements LineageVertexProvider, ResultTypeQueryable<RowData> {
 
     private static final long serialVersionUID = 2L;
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
@@ -296,6 +304,19 @@ public class JdbcRowDataInputFormat extends 
RichInputFormat<RowData, InputSplit>
         return new Builder();
     }
 
+    @Override
+    public LineageVertex getLineageVertex() {
+        DefaultTypeDatasetFacet defaultTypeDatasetFacet =
+                new DefaultTypeDatasetFacet(getProducedType());
+        Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate, 
true);
+        String namespace = LineageUtils.namespaceOf(connectionProvider);
+        LineageDataset dataset =
+                LineageUtils.datasetOf(
+                        nameOpt.orElse(""), namespace, 
Arrays.asList(defaultTypeDatasetFacet));
+        return LineageUtils.sourceLineageVertexOf(
+                Boundedness.BOUNDED, Collections.singleton(dataset));
+    }
+
     /** Builder for {@link JdbcRowDataInputFormat}. */
     public static class Builder {
         private JdbcConnectionOptions.JdbcConnectionOptionsBuilder 
connOptionsBuilder;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
index dd39fa27..1466df45 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
@@ -20,18 +20,26 @@ package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
 import 
org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +54,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -53,7 +62,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A lookup function for {@link JdbcDynamicTableSource}. */
 @Internal
-public class JdbcRowDataLookupFunction extends LookupFunction {
+public class JdbcRowDataLookupFunction extends LookupFunction implements 
LineageVertexProvider {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
     private static final long serialVersionUID = 2L;
@@ -67,6 +76,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction 
{
 
     private final List<String> resolvedPredicates;
     private final Serializable[] pushdownParams;
+    private final RowType producedType;
 
     private transient FieldNamedPreparedStatement statement;
 
@@ -106,12 +116,12 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
                         .getSelectFromStatement(options.getTableName(), 
fieldNames, keyNames);
         JdbcDialect jdbcDialect = options.getDialect();
         this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType);
-        this.lookupKeyRowConverter =
-                jdbcDialect.getRowConverter(
-                        RowType.of(
-                                Arrays.stream(keyTypes)
-                                        .map(DataType::getLogicalType)
-                                        .toArray(LogicalType[]::new)));
+        this.producedType =
+                RowType.of(
+                        Arrays.stream(keyTypes)
+                                .map(DataType::getLogicalType)
+                                .toArray(LogicalType[]::new));
+        this.lookupKeyRowConverter = jdbcDialect.getRowConverter(producedType);
         this.resolvedPredicates = resolvedPredicates;
         this.pushdownParams = pushdownParams;
     }
@@ -224,4 +234,19 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
     public Connection getDbConnection() {
         return connectionProvider.getConnection();
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        DefaultTypeDatasetFacet defaultTypeDatasetFacet =
+                new DefaultTypeDatasetFacet(
+                        LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+                                
TypeConversions.fromLogicalToDataType(producedType)));
+        Optional<String> nameOpt = LineageUtils.tableNameOf(query, true);
+        String namespace = LineageUtils.namespaceOf(connectionProvider);
+        LineageDataset dataset =
+                LineageUtils.datasetOf(
+                        nameOpt.orElse(""), namespace, 
Arrays.asList(defaultTypeDatasetFacet));
+        return LineageUtils.sourceLineageVertexOf(
+                Boundedness.BOUNDED, Collections.singleton(dataset));
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
index 4c48f799..3c0c00e4 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
@@ -149,4 +149,8 @@ public class SimpleJdbcConnectionProvider implements 
JdbcConnectionProvider, Ser
         closeConnection();
         return getOrEstablishConnection();
     }
+
+    public String getDbURL() {
+        return this.jdbcOptions.getDbURL();
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
index c031419d..d01afee8 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -36,7 +38,7 @@ import java.io.IOException;
 /** A generic SinkFunction for JDBC. */
 @Internal
 public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
-        implements CheckpointedFunction, InputTypeConfigurable {
+        implements LineageVertexProvider, CheckpointedFunction, 
InputTypeConfigurable {
     private final JdbcOutputFormat<T, ?, ?> outputFormat;
     private JdbcOutputSerializer<T> serializer;
 
@@ -78,4 +80,9 @@ public class GenericJdbcSinkFunction<T> extends 
RichSinkFunction<T>
                         ((TypeInformation<T>) type)
                                 
.createSerializer(executionConfig.getSerializerConfig()));
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        return outputFormat.getLineageVertex();
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
index 7668236e..e02657bf 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
@@ -23,6 +23,10 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.lineage.LineageUtils;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.SerializableSupplier;
 
@@ -36,6 +40,8 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -47,7 +53,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /** A JDBC outputFormat that supports batching records before writing records 
to database. */
 @Internal
 public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends 
JdbcBatchStatementExecutor<JdbcIn>>
-        implements Flushable, AutoCloseable, Serializable {
+        implements LineageVertexProvider, Flushable, AutoCloseable, 
Serializable {
 
     protected final JdbcConnectionProvider connectionProvider;
 
@@ -247,4 +253,16 @@ public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends 
JdbcBatchStatementExe
     public Connection getConnection() {
         return connectionProvider.getConnection();
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        Optional<String> nameOpt =
+                jdbcStatementExecutor == null
+                        ? Optional.empty()
+                        : 
LineageUtils.tableNameOf(jdbcStatementExecutor.insertSql(), false);
+        String namespace = LineageUtils.namespaceOf(connectionProvider);
+        LineageDataset dataset =
+                LineageUtils.datasetOf(nameOpt.orElse(""), namespace, 
Collections.emptyList());
+        return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java
index 838cbc52..bcf51961 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java
@@ -108,6 +108,11 @@ public final class InsertOrUpdateJdbcExecutor<R, K, V> 
implements JdbcBatchState
         }
     }
 
+    @Override
+    public String insertSql() {
+        return insertSQL;
+    }
+
     private void processOneRowInBatch(K pk, V row) throws SQLException {
         if (exist(pk)) {
             updateSetter.accept(updateStatement, row);
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java
index 3e52e87f..577f0422 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java
@@ -40,6 +40,9 @@ public interface JdbcBatchStatementExecutor<T> {
     /** Close JDBC related statements. */
     void closeStatements() throws SQLException;
 
+    /** return the insert sql of the executor. */
+    String insertSql();
+
     static <T, K> JdbcBatchStatementExecutor<T> keyed(
             String sql, Function<T, K> keyExtractor, JdbcStatementBuilder<K> 
statementBuilder) {
         return new KeyedBatchStatementExecutor<>(sql, keyExtractor, 
statementBuilder);
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java
index c16f275f..e14eedd0 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java
@@ -79,6 +79,11 @@ class KeyedBatchStatementExecutor<T, K> implements 
JdbcBatchStatementExecutor<T>
         }
     }
 
+    @Override
+    public String insertSql() {
+        return sql;
+    }
+
     @Override
     public void closeStatements() throws SQLException {
         if (st != null) {
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java
index 253fdb42..76102a81 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java
@@ -71,6 +71,11 @@ class SimpleBatchStatementExecutor<T> implements 
JdbcBatchStatementExecutor<T> {
         }
     }
 
+    @Override
+    public String insertSql() {
+        return sql;
+    }
+
     @Override
     public void closeStatements() throws SQLException {
         if (st != null) {
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
index fd062ccc..ac013d85 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
@@ -84,6 +84,11 @@ public final class TableBufferReducedStatementExecutor
         }
     }
 
+    @Override
+    public String insertSql() {
+        return upsertExecutor.insertSql();
+    }
+
     @Override
     public void executeBatch() throws SQLException {
         if (!reduceBuffer.isEmpty()) {
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
index 7e5e50ec..17e2f5df 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
@@ -61,6 +61,11 @@ public final class TableBufferedStatementExecutor implements 
JdbcBatchStatementE
         }
     }
 
+    @Override
+    public String insertSql() {
+        return statementExecutor.insertSql();
+    }
+
     @Override
     public void closeStatements() throws SQLException {
         statementExecutor.closeStatements();
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
index 61ef4cc4..7df38535 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
@@ -82,6 +82,11 @@ public final class TableInsertOrUpdateStatementExecutor
         processOneRowInBatch(keyExtractor.apply(record), record);
     }
 
+    @Override
+    public String insertSql() {
+        return insertStatement.getQuery();
+    }
+
     private void processOneRowInBatch(RowData pk, RowData row) throws 
SQLException {
         if (exist(pk)) {
             updateSetter.toExternal(row, updateStatement);
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
index a3fc7b7e..88a1ad50 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
@@ -60,6 +60,11 @@ public final class TableSimpleStatementExecutor implements 
JdbcBatchStatementExe
         st.addBatch();
     }
 
+    @Override
+    public String insertSql() {
+        return st.getQuery();
+    }
+
     @Override
     public void executeBatch() throws SQLException {
         st.executeBatch();
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java
new file mode 100644
index 00000000..989340fd
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Objects;
+
+/** Default implementation of {@link TypeDatasetFacet}. */
+@PublicEvolving
+public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
+
+    public static final String TYPE_FACET_NAME = "type";
+
+    private final TypeInformation typeInformation;
+
+    public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
+        this.typeInformation = typeInformation;
+    }
+
+    public TypeInformation getTypeInformation() {
+        return typeInformation;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
+        return Objects.equals(typeInformation, that.typeInformation);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(typeInformation);
+    }
+
+    @Override
+    public String name() {
+        return TYPE_FACET_NAME;
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java
new file mode 100644
index 00000000..cf224da8
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+
+/** JDBC connection URL location. */
+@Internal
+public class JdbcLocation {
+    private final String scheme;
+    private final Optional<String> authority;
+    private final Optional<String> instance;
+    private final Optional<String> database;
+
+    private JdbcLocation(
+            String scheme,
+            Optional<String> authority,
+            Optional<String> instance,
+            Optional<String> database) {
+        this.scheme = scheme;
+        this.authority = authority;
+        this.instance = instance;
+        this.database = database;
+    }
+
+    public String toNamespace() {
+        String result = scheme.toLowerCase(Locale.ROOT) + ":";
+        if (authority.isPresent()) {
+            result = String.format("%s//%s", result, 
authority.get().toLowerCase(Locale.ROOT));
+        }
+        if (instance.isPresent()) {
+            result = String.format("%s/%s", result, 
StringUtils.stripStart(instance.get(), "/"));
+        }
+        return result;
+    }
+
+    public String toName(List<String> parts) {
+        if (database.isPresent()) {
+            parts.add(0, database.get());
+        }
+        return String.join(".", parts);
+    }
+
+    public String getScheme() {
+        return this.scheme;
+    }
+
+    public Optional<String> getAuthority() {
+        return this.authority;
+    }
+
+    public Optional<String> getInstance() {
+        return this.instance;
+    }
+
+    public Optional<String> getDatabase() {
+        return this.database;
+    }
+
+    public static JdbcLocation.Builder builder() {
+        return new JdbcLocation.Builder();
+    }
+
+    /** Builder for {@link JdbcLocation}. */
+    @PublicEvolving
+    public static final class Builder {
+        private String scheme = "";
+        private Optional<String> authority = Optional.empty();
+        private Optional<String> instance = Optional.empty();
+        private Optional<String> database = Optional.empty();
+
+        public Builder withScheme(String scheme) {
+            this.scheme = scheme;
+            return this;
+        }
+
+        public Builder withAuthority(Optional<String> authority) {
+            this.authority = authority;
+            return this;
+        }
+
+        public Builder withInstance(Optional<String> instance) {
+            this.instance = instance;
+            return this;
+        }
+
+        public Builder withDatabase(Optional<String> database) {
+            this.database = database;
+            return this;
+        }
+
+        public JdbcLocation build() {
+            return new JdbcLocation(scheme, authority, instance, database);
+        }
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java
new file mode 100644
index 00000000..ec5d9e8e
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+
+/**
+ * A factory to create a specific {@link JdbcExtractor}. This factory is used 
with Java's Service
+ * Provider Interfaces (SPI) for discovering.
+ *
+ * <p>Classes that implement this interface can be added to the
+ * 
"META_INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory"
 file of
+ * a JAR file in the current classpath to be found.
+ *
+ * @see JdbcExtractor
+ */
+@PublicEvolving
+public interface JdbcLocationExtractorFactory {
+
+    JdbcExtractor createExtractor();
+}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java
new file mode 100644
index 00000000..71775d8e
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+/** Utils for JDBC url preprocess and namespace extraction. */
+public class JdbcUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);
+    private static final String SLASH_DELIMITER_USER_PASSWORD_REGEX =
+            "[A-Za-z0-9_%]+//?[A-Za-z0-9_%]*@";
+    private static final String COLON_DELIMITER_USER_PASSWORD_REGEX =
+            "([/|,])[A-Za-z0-9_%]+:?[A-Za-z0-9_%]*@";
+    private static final String PARAMS_USER_PASSWORD_REGEX =
+            "(?i)[,;&:]?(?:user|username|password)=[^,;&:()]+[,;&:]?";
+    private static final String DUPLICATED_DELIMITERS = 
"(\\(\\)){2,}|[,;&:]{2,}";
+    private static final String QUERY_PARAMS_REGEX = "\\?.*$";
+
+    private static final List<JdbcExtractor> extractors = new ArrayList<>();
+
+    static {
+        for (JdbcLocationExtractorFactory factory :
+                ServiceLoader.load(JdbcLocationExtractorFactory.class)) {
+            extractors.add(factory.createExtractor());
+        }
+    }
+
+    /**
+     * Get JDBC namespace from JdbcUrl.
+     *
+     * @param jdbcUrl JDBC URL
+     * @param properties connection properties
+     * @return namespace String
+     */
+    public static String getJdbcNamespace(String jdbcUrl, Properties 
properties) {
+        String uri = jdbcUrl.replaceAll("^(?i)jdbc:", "");
+        try {
+            JdbcExtractor extractor = getExtractor(uri);
+            return extractor.extract(uri, properties).toNamespace();
+        } catch (URISyntaxException e) {
+            LOG.debug("Failed to parse jdbc url", e);
+            return dropSensitiveData(uri);
+        }
+    }
+
+    /**
+     * Get the corresponding JdbcExtractor of the jdbc Url.
+     *
+     * @param jdbcUrl JDBC URL
+     * @return JdbcExtractor
+     * @throws URISyntaxException
+     */
+    public static JdbcExtractor getExtractor(String jdbcUrl) throws 
URISyntaxException {
+        for (JdbcExtractor extractor : extractors) {
+            if (extractor.isDefinedAt(jdbcUrl)) {
+                return extractor;
+            }
+        }
+
+        throw new URISyntaxException(jdbcUrl, "Unsupported JDBC URL");
+    }
+
+    /**
+     * JdbcUrl can contain username and password this method clean-up 
credentials from jdbcUrl. Also
+     * drop query params as they include a lot of useless options, like timeout
+     *
+     * @param jdbcUrl url to database
+     * @return String
+     */
+    private static String dropSensitiveData(String jdbcUrl) {
+        return jdbcUrl.replaceAll(SLASH_DELIMITER_USER_PASSWORD_REGEX, "@")
+                .replaceAll(COLON_DELIMITER_USER_PASSWORD_REGEX, "$1")
+                .replaceAll(PARAMS_USER_PASSWORD_REGEX, "")
+                .replaceAll(DUPLICATED_DELIMITERS, "")
+                .replaceAll(QUERY_PARAMS_REGEX, "");
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java
new file mode 100644
index 00000000..0a09671c
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
+import 
org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import io.openlineage.sql.DbTableMeta;
+import io.openlineage.sql.OpenLineageSql;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Utils for Lineage metadata extraction. */
+@PublicEvolving
+public class LineageUtils {
+
+    public static Optional<String> nameOf(
+            JdbcQueryStatement<?> jdbcQueryStatement, boolean isSource) {
+        if (!(jdbcQueryStatement instanceof SimpleJdbcQueryStatement)) {
+            return Optional.empty();
+        }
+
+        SimpleJdbcQueryStatement<?> simpleJdbcQueryStatement =
+                (SimpleJdbcQueryStatement<?>) jdbcQueryStatement;
+        return tableNameOf(simpleJdbcQueryStatement.query(), isSource);
+    }
+
+    public static Optional<String> tableNameOf(String query, boolean isSource) 
{
+        return OpenLineageSql.parse(Arrays.asList(query))
+                .map(
+                        sqlMeta ->
+                                isSource
+                                        ? 
getFirstQualifiedName(sqlMeta.inTables())
+                                        : 
getFirstQualifiedName(sqlMeta.outTables()));
+    }
+
+    public static String namespaceOf(JdbcConnectionProvider 
jdbcConnectionProvider) {
+        if (!(jdbcConnectionProvider instanceof SimpleJdbcConnectionProvider)) 
{
+            return "";
+        }
+
+        SimpleJdbcConnectionProvider simpleJdbcConnectionProvider =
+                (SimpleJdbcConnectionProvider) jdbcConnectionProvider;
+
+        return JdbcUtils.getJdbcNamespace(
+                simpleJdbcConnectionProvider.getDbURL(),
+                simpleJdbcConnectionProvider.getProperties());
+    }
+
+    public static LineageDataset datasetOf(
+            String name, String namespace, List<LineageDatasetFacet> facets) {
+
+        return new LineageDataset() {
+            @Override
+            public String name() {
+                return name;
+            }
+
+            @Override
+            public String namespace() {
+                return namespace;
+            }
+
+            @Override
+            public Map<String, LineageDatasetFacet> facets() {
+                Map<String, LineageDatasetFacet> facetMap = new HashMap<>();
+                facetMap.putAll(
+                        facets.stream()
+                                .collect(
+                                        
Collectors.toMap(LineageDatasetFacet::name, item -> item)));
+                return facetMap;
+            }
+        };
+    }
+
+    public static LineageVertex lineageVertexOf(Collection<LineageDataset> 
datasets) {
+        return new LineageVertex() {
+            @Override
+            public List<LineageDataset> datasets() {
+                return new ArrayList<>(datasets);
+            }
+        };
+    }
+
+    public static SourceLineageVertex sourceLineageVertexOf(
+            Boundedness boundedness, Collection<LineageDataset> datasets) {
+        return new SourceLineageVertex() {
+            @Override
+            public Boundedness boundedness() {
+                return boundedness;
+            }
+
+            @Override
+            public List<LineageDataset> datasets() {
+                return new ArrayList<>(datasets);
+            }
+        };
+    }
+
+    private static String getFirstQualifiedName(List<DbTableMeta> tableMetas) {
+        return tableMetas.isEmpty() ? "" : tableMetas.get(0).qualifiedName();
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java
new file mode 100644
index 00000000..4f343f39
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+/** Facet definition to contain type information of source and sink. */
+@PublicEvolving
+public interface TypeDatasetFacet extends LineageDatasetFacet {
+
+    TypeInformation<?> getTypeInformation();
+}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
index 85814ece..d8229783 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
@@ -83,6 +83,13 @@ public interface FieldNamedPreparedStatement extends 
AutoCloseable {
                 connection, sql, fieldNames, additionalPredicates, 
numberOfDynamicParams);
     }
 
+    /**
+     * Returns the final prepared query.
+     *
+     * @return prepared query
+     */
+    String getQuery();
+
     /**
      * Clears the current parameter values immediately.
      *
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
index fc05b90b..ea2bac1e 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
@@ -39,10 +39,18 @@ public class FieldNamedPreparedStatementImpl implements 
FieldNamedPreparedStatem
 
     private final PreparedStatement statement;
     private final int[][] indexMapping;
+    private final String query;
 
-    private FieldNamedPreparedStatementImpl(PreparedStatement statement, 
int[][] indexMapping) {
+    private FieldNamedPreparedStatementImpl(
+            PreparedStatement statement, int[][] indexMapping, String query) {
         this.statement = statement;
         this.indexMapping = indexMapping;
+        this.query = query;
+    }
+
+    @Override
+    public String getQuery() {
+        return query;
     }
 
     @Override
@@ -221,7 +229,7 @@ public class FieldNamedPreparedStatementImpl implements 
FieldNamedPreparedStatem
         }
 
         return new FieldNamedPreparedStatementImpl(
-                connection.prepareStatement(parsedSQL), indexMapping);
+                connection.prepareStatement(parsedSQL), indexMapping, 
parsedSQL);
     }
 
     /**
diff --git a/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE 
b/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE
new file mode 100644
index 00000000..0be718a0
--- /dev/null
+++ b/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,11 @@
+flink-connector-jdbc-core
+
+Copyright 2014-2024 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+- io.openlineage:openlineage-sql-java:1.32.0
+- io.openlineage:openlineage-java:1.32.0
+
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
index f72434db..61b897e5 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.connector.jdbc;
 
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
 import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import 
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.AfterEach;
@@ -380,6 +384,34 @@ class JdbcInputFormatTest extends JdbcDataTestBase {
         jdbcInputFormat.closeInputFormat();
     }
 
+    @Test
+    void testGetLineageVertex() {
+        jdbcInputFormat =
+                JdbcInputFormat.buildJdbcInputFormat()
+                        .setDrivername(getMetadata().getDriverClass())
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setQuery(SELECT_ALL_BOOKS)
+                        .setRowTypeInfo(ROW_TYPE_INFO)
+                        .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+                        .finish();
+
+        SourceLineageVertex lineageVertex =
+                (SourceLineageVertex) jdbcInputFormat.getLineageVertex();
+
+        assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+        assertThat(lineageVertex.boundedness()).isEqualTo(Boundedness.BOUNDED);
+        LineageDataset lineageDataset = lineageVertex.datasets().get(0);
+        assertThat(lineageDataset.name()).isEqualTo("books");
+        assertThat(lineageDataset.namespace()).isEqualTo("derby:memory:test");
+        assertThat(lineageDataset.facets().size()).isEqualTo(1);
+        assertThat(lineageDataset.facets().size()).isEqualTo(1);
+        assertThat(
+                        ((DefaultTypeDatasetFacet) 
lineageDataset.facets().values().toArray()[0])
+                                .getTypeInformation()
+                                .getArity())
+                .isEqualTo(5);
+    }
+
     private void verifySplit(InputSplit split, int expectedIDSum) throws 
IOException {
         int sum = 0;
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
index 4d8a20b6..305837e7 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
 import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.AfterEach;
@@ -282,6 +283,24 @@ class JdbcRowOutputFormatTest extends JdbcDataTestBase {
         }
     }
 
+    @Test
+    void testGetLineageVertex() throws Exception {
+        jdbcOutputFormat =
+                JdbcRowOutputFormat.buildJdbcOutputFormat()
+                        .setDrivername(getMetadata().getDriverClass())
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
+                        .finish();
+        JdbcOutputSerializer<Row> serializer =
+                
JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
+        jdbcOutputFormat.open(serializer);
+
+        LineageVertex lineageVertex = jdbcOutputFormat.getLineageVertex();
+        assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+        
assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("newbooks");
+        
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test");
+    }
+
     @Test
     void testFlush() throws SQLException, IOException {
         jdbcOutputFormat =
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
index 48fe94f1..d8ab0d71 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
 import org.apache.flink.streaming.util.RestartStrategyUtils;
 
 import org.junit.jupiter.api.Test;
@@ -108,6 +109,19 @@ public abstract class BaseJdbcSinkTest implements 
DerbyTestBase {
         assertResult(BOOKS);
     }
 
+    @Test
+    public void testGetLineageVertex() {
+        JdbcSink<?> sink =
+                finishSink(
+                        new JdbcSinkBuilder<BooksTable.BookEntry>()
+                                .withQueryStatement(
+                                        TEST_TABLE.getInsertIntoQuery(),
+                                        TEST_TABLE.getStatementBuilder()));
+
+        LineageVertex lineageVertex = sink.getLineageVertex();
+        assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+    }
+
     private void assertResult(List<BooksTable.BookEntry> expected) throws 
SQLException {
         
assertThat(TEST_TABLE.selectAllTable(getMetadata().getConnection())).isEqualTo(expected);
     }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
index a22e6859..0f20a9c8 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
@@ -20,11 +20,13 @@ package 
org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.connector.jdbc.JdbcDataTestBase;
 import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 import org.apache.flink.streaming.util.RestartStrategyUtils;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -135,6 +137,27 @@ class JdbcSourceITCase extends JdbcDataTestBase implements 
JdbcITCaseBase {
         assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA);
     }
 
+    @Test
+    void testGetLineageVertex() {
+        JdbcSource<TestEntry> jdbcSource =
+                JdbcSource.<TestEntry>builder()
+                        
.setTypeInformation(TypeInformation.of(TestEntry.class))
+                        .setSql(sql + " where id >= ? and id <= ?")
+                        .setJdbcParameterValuesProvider(
+                                new JdbcGenericParameterValuesProvider(
+                                        new Serializable[][] {{1001, 1005}, 
{1006, 1010}}))
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setDriverName(getMetadata().getDriverClass())
+                        .setResultExtractor(extractor)
+                        .build();
+
+        SourceLineageVertex lineageVertex = (SourceLineageVertex) 
jdbcSource.getLineageVertex();
+        assertThat(lineageVertex.boundedness()).isEqualTo(Boundedness.BOUNDED);
+        assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+        assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("books");
+        
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test");
+    }
+
     /** A sink function to collect the records. */
     static class TestingSinkFunction implements SinkFunction<TestEntry> {
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
index 32618d92..af549ec6 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -371,6 +372,39 @@ class JdbcOutputFormatTest extends JdbcDataTestBase {
         }
     }
 
+    @Test
+    void testGetLineageVertex() throws Exception {
+        InternalJdbcConnectionOptions jdbcOptions =
+                InternalJdbcConnectionOptions.builder()
+                        .setDriverName(getMetadata().getDriverClass())
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setTableName(OUTPUT_TABLE)
+                        .build();
+        JdbcDmlOptions dmlOptions =
+                JdbcDmlOptions.builder()
+                        .withTableName(jdbcOptions.getTableName())
+                        .withDialect(jdbcOptions.getDialect())
+                        .withFieldNames(fieldNames)
+                        .build();
+
+        outputFormat =
+                new JdbcOutputFormatBuilder()
+                        .setJdbcOptions(jdbcOptions)
+                        .setFieldDataTypes(fieldDataTypes)
+                        .setJdbcDmlOptions(dmlOptions)
+                        
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
+                        .build();
+
+        JdbcOutputSerializer<RowData> serializer =
+                
JdbcOutputSerializer.of(getSerializer(TypeInformation.of(RowData.class), true));
+        outputFormat.open(serializer);
+
+        LineageVertex lineageVertex = outputFormat.getLineageVertex();
+        assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+        
assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("newbooks");
+        
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test");
+    }
+
     @Test
     void testFlush() throws SQLException, IOException {
         InternalJdbcConnectionOptions jdbcOptions =
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index ba986cf5..171536d0 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnecti
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
 import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.AfterEach;
@@ -121,6 +122,11 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
                                         }
                                     }
 
+                                    @Override
+                                    public String insertSql() {
+                                        return "";
+                                    }
+
                                     @Override
                                     public void prepareStatements(Connection 
connection) {}
 
@@ -139,6 +145,11 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
                                         }
                                     }
 
+                                    @Override
+                                    public String insertSql() {
+                                        return "";
+                                    }
+
                                     @Override
                                     public void addToBatch(Row record) {}
 
@@ -213,6 +224,36 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
         check(expected);
     }
 
+    @Test
+    void testGetLineageVertex() throws Exception {
+        InternalJdbcConnectionOptions options =
+                InternalJdbcConnectionOptions.builder()
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setTableName(OUTPUT_TABLE)
+                        .build();
+        JdbcDmlOptions dmlOptions =
+                JdbcDmlOptions.builder()
+                        .withTableName(options.getTableName())
+                        .withDialect(options.getDialect())
+                        .withFieldNames(fieldNames)
+                        .withKeyFields(keyFields)
+                        .build();
+        format =
+                new TableJdbcUpsertOutputFormat(
+                        new SimpleJdbcConnectionProvider(options),
+                        dmlOptions,
+                        JdbcExecutionOptions.defaults());
+
+        JdbcOutputSerializer<Row> serializer =
+                
JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
+        format.open(serializer);
+
+        LineageVertex lineageVertex = format.getLineageVertex();
+        assertThat(lineageVertex.datasets().size()).isEqualTo(1);
+        
assertThat(lineageVertex.datasets().get(0).name()).isEqualTo(OUTPUT_TABLE);
+        
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test");
+    }
+
     private void check(Row[] rows) throws SQLException {
         check(rows, getMetadata().getJdbcUrl(), OUTPUT_TABLE, fieldNames);
     }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java
new file mode 100644
index 00000000..ad90e123
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link JdbcUtils}. */
+class JdbcUtilsTest {
+    private static final String TEST_JDBC_URL = "jdbc:test://localhost/testdb";
+    private static final String UNSUPPORTED_JDBC_URL = 
"jdbc:unsupported://localhost:8990/testdb";
+
+    @Test
+    void testGetTestJdbcExtractor() throws Exception {
+        JdbcExtractor jdbcExtractor = 
JdbcUtils.getExtractor("test://localhost/testdb");
+        assertThat(jdbcExtractor).isInstanceOf(TestJdbcExtractor.class);
+    }
+
+    @Test
+    void testGetUnsupportedJdbcExtractor() {
+        assertThatThrownBy(
+                        () -> {
+                            
JdbcUtils.getExtractor("unsupported://localhost:8990/testdb");
+                        })
+                .isInstanceOf(URISyntaxException.class)
+                .hasMessage("Unsupported JDBC URL: 
unsupported://localhost:8990/testdb");
+    }
+
+    @Test
+    void testGetValidJdbcNamespace() {
+        String test = JdbcUtils.getJdbcNamespace(TEST_JDBC_URL, new 
Properties());
+        assertThat(test).isEqualTo("test://localhost:10051");
+    }
+
+    @Test
+    void testUnsupportedJdbcNamespace() {
+        String test = JdbcUtils.getJdbcNamespace(UNSUPPORTED_JDBC_URL, new 
Properties());
+        assertThat(test).isEqualTo("unsupported://localhost:8990/testdb");
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java
new file mode 100644
index 00000000..4597a7e4
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LineageUtils}. */
+class LineageUtilsTest {
+
+    @Test
+    void testGetNameFromJdbcQueryStatement() {
+        SimpleJdbcQueryStatement<String> simpleStatement =
+                new SimpleJdbcQueryStatement<String>("select * from 
test_table", null);
+
+        Optional<String> tableNameOpt = LineageUtils.nameOf(simpleStatement, 
true);
+        assertThat(tableNameOpt).isNotEmpty();
+        assertThat(tableNameOpt.get()).isEqualTo("test_table");
+    }
+
+    @Test
+    void testGetNamespaceFromJdbcConnectionProvider() {
+        JdbcConnectionOptions options =
+                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                        .withUrl("jdbc:test://localhost/test_db")
+                        .build();
+        SimpleJdbcConnectionProvider provider = new 
SimpleJdbcConnectionProvider(options);
+        String namespace = LineageUtils.namespaceOf(provider);
+        assertThat(namespace).isEqualTo("test://localhost:10051");
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java
new file mode 100644
index 00000000..84021db6
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java
@@ -0,0 +1,25 @@
+package org.apache.flink.connector.jdbc.lineage;
+
+import io.openlineage.client.utils.jdbc.GenericJdbcExtractor;
+import io.openlineage.client.utils.jdbc.JdbcLocation;
+import io.openlineage.client.utils.jdbc.OverridingJdbcExtractor;
+
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+/** Implementation of {@link io.openlineage.client.utils.jdbc.JdbcExtractor} 
for test purpose. */
+public class TestJdbcExtractor extends GenericJdbcExtractor {
+    private OverridingJdbcExtractor delegate;
+
+    public TestJdbcExtractor() {
+        this.delegate = new OverridingJdbcExtractor("test", "10051");
+    }
+
+    public boolean isDefinedAt(String jdbcUri) {
+        return this.delegate.isDefinedAt(jdbcUri);
+    }
+
+    public JdbcLocation extract(String rawUri, Properties properties) throws 
URISyntaxException {
+        return this.delegate.extract(rawUri, properties);
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java
new file mode 100644
index 00000000..70c2a6fe
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.lineage;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+
+/** JdbcLocationExtractorFactory for {@link TestJdbcExtractor}. */
+public class TestJdbcLocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new TestJdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..00d34173
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.lineage.TestJdbcLocationExtractorFactory
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java
 
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java
new file mode 100644
index 00000000..ad17dfd3
--- /dev/null
+++ 
b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.cratedb.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.CrateJdbcExtractor;
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for CrateDB. */
+@Internal
+public class CrateLocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new CrateJdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..bcd4dfeb
--- /dev/null
+++ 
b/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.cratedb.database.lineage.CrateLocationExtractorFactory
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java
 
b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java
new file mode 100644
index 00000000..d4a0de2d
--- /dev/null
+++ 
b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.db2.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.Db2JdbcExtractor;
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for DB2. */
+@Internal
+public class Db2LocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new Db2JdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..9f066f72
--- /dev/null
+++ 
b/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.db2.database.lineage.Db2LocationExtractorFactory
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java
 
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java
new file mode 100644
index 00000000..d1c5e00e
--- /dev/null
+++ 
b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.mysql.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.MySqlJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for Mysql. */
+@Internal
+public class MySqlLocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new MySqlJdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..b681048a
--- /dev/null
+++ 
b/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.mysql.database.lineage.MySqlLocationExtractorFactory
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java
 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java
new file mode 100644
index 00000000..f60e504a
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oceanbase.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.OceanBaseJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for OceanBase. */
+@Internal
+public class OceanBaseLocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new OceanBaseJdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..678ddb65
--- /dev/null
+++ 
b/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.oceanbase.database.lineage.OceanBaseLocationExtractorFactory
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java
 
b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java
new file mode 100644
index 00000000..7ace87a4
--- /dev/null
+++ 
b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.oracle.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.OracleJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for Oracle. */
+@Internal
+public class OracleLocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new OracleJdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..7428410d
--- /dev/null
+++ 
b/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.oracle.database.lineage.OracleLocationExtractorFactory
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java
 
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java
new file mode 100644
index 00000000..4ecb74be
--- /dev/null
+++ 
b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.postgres.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.PostgresJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for Postgres. */
+@Internal
+public class PostgresLocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new PostgresJdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..0ea5acd7
--- /dev/null
+++ 
b/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.postgres.database.lineage.PostgresLocationExtractorFactory
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java
 
b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java
new file mode 100644
index 00000000..94fff01a
--- /dev/null
+++ 
b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.sqlserver.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.SqlServerJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for SqlServer. */
+@Internal
+public class SqlServerLocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new SqlServerJdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..2598cebb
--- /dev/null
+++ 
b/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.sqlserver.database.lineage.SqlServerLocationExtractorFactory
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java
 
b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java
new file mode 100644
index 00000000..f239f978
--- /dev/null
+++ 
b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.trino.database.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory;
+
+import io.openlineage.client.utils.jdbc.JdbcExtractor;
+import io.openlineage.client.utils.jdbc.TrinoJdbcExtractor;
+
+/** Implementation of {@link JdbcLocationExtractorFactory} for Trino. */
+@Internal
+public class TrinoLocationExtractorFactory implements 
JdbcLocationExtractorFactory {
+
+    @Override
+    public JdbcExtractor createExtractor() {
+        return new TrinoJdbcExtractor();
+    }
+}
diff --git 
a/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
 
b/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
new file mode 100644
index 00000000..8be9bbb7
--- /dev/null
+++ 
b/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.trino.database.lineage.TrinoLocationExtractorFactory
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index fe2fc778..a21db8ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@ under the License.
         <assertj.version>3.24.2</assertj.version>
         <testcontainers.version>1.20.1</testcontainers.version>
         <mockito.version>3.12.4</mockito.version>
+        <openlineage.version>1.32.0</openlineage.version>
 
         <japicmp.referenceVersion>3.0.0-1.16</japicmp.referenceVersion>
 
@@ -253,6 +254,18 @@ under the License.
                 <version>${log4j.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>io.openlineage</groupId>
+                <artifactId>openlineage-sql-java</artifactId>
+                <version>${openlineage.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>io.openlineage</groupId>
+                <artifactId>openlineage-java</artifactId>
+                <version>${openlineage.version}</version>
+            </dependency>
+
             <!-- For dependency convergence -->
             <dependency>
                 <groupId>com.fasterxml.jackson</groupId>

Reply via email to