This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8979b7acf [core][lineage] Introduce lineage meta interfaces (#1531)
8979b7acf is described below
commit 8979b7acfdf0c10f6526ee66f6306b83f5895729
Author: Shammon FY <[email protected]>
AuthorDate: Wed Aug 2 11:19:54 2023 +0800
[core][lineage] Introduce lineage meta interfaces (#1531)
* [core][lineage] Introduce lineage meta interfaces
---
.../generated/catalog_configuration.html | 6 +
.../apache/paimon/lineage/DataLineageEntity.java | 33 +++++
.../org/apache/paimon/lineage/LineageMeta.java | 103 ++++++++++++++
.../apache/paimon/lineage/LineageMetaFactory.java | 35 +++++
.../apache/paimon/lineage/TableLineageEntity.java | 32 +++++
.../paimon/lineage/TableLineageEntityImpl.java | 56 ++++++++
.../org/apache/paimon/options/CatalogOptions.java | 22 +++
.../org/apache/paimon/catalog/AbstractCatalog.java | 32 ++++-
.../paimon/table/AbstractFileStoreTable.java | 27 ++--
.../paimon/table/AppendOnlyFileStoreTable.java | 13 +-
.../apache/paimon/table/CatalogEnvironment.java | 63 +++++++++
.../table/ChangelogValueCountFileStoreTable.java | 12 +-
.../table/ChangelogWithKeyFileStoreTable.java | 13 +-
.../org/apache/paimon/table/FileStoreTable.java | 2 +
.../apache/paimon/table/FileStoreTableFactory.java | 40 +++---
.../apache/paimon/table/sink/TableCommitTest.java | 4 +-
...tinuousAppendAndCompactFollowUpScannerTest.java | 7 +-
.../table/source/snapshot/ScannerTestBase.java | 7 +-
.../paimon/flink/AbstractFlinkTableFactory.java | 43 +++++-
.../org/apache/paimon/flink/CatalogITCaseBase.java | 20 ++-
.../apache/paimon/flink/FlinkLineageITCase.java | 155 +++++++++++++++++++++
.../apache/paimon/flink/sink/FlinkSinkTest.java | 4 +-
.../services/org.apache.paimon.factories.Factory | 5 +-
23 files changed, 658 insertions(+), 76 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 118ecc44e..d243737a8 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -32,6 +32,12 @@ under the License.
<td>Boolean</td>
<td>Allow to fallback to hadoop File IO when no file io found for
the scheme.</td>
</tr>
+ <tr>
+ <td><h5>lineage-meta</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The lineage meta to store table and data lineage
information.<br /><br />Possible values:<br /><ul><li>"jdbc": Use standard jdbc
to store table and data lineage information.</li></ul><ul><li>"custom": You can
implement LineageMetaFactory and LineageMeta to store lineage information in
customized storage.</li></ul></td>
+ </tr>
<tr>
<td><h5>lock-acquire-timeout</h5></td>
<td style="word-wrap: break-word;">8 min</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
new file mode 100644
index 000000000..e7401a9be
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.data.Timestamp;
+
+/**
+ * Data lineage entity with table lineage, barrier id and snapshot id for
table source and sink
+ * lineage.
+ */
+public interface DataLineageEntity extends TableLineageEntity {
+ long getBarrierId();
+
+ long getSnapshotId();
+
+ Timestamp getCreateTime();
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
new file mode 100644
index 000000000..3fce04aaa
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.predicate.Predicate;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/** Metadata store will manage table lineage and data lineage information for
the catalog. */
+public interface LineageMeta extends Serializable {
+ /**
+ * Save the source table and job lineage.
+ *
+ * @param entity the table lineage entity
+ */
+ void saveSourceTableLineage(TableLineageEntity entity);
+
+ /**
+ * Delete the source table lineage for given job.
+ *
+ * @param job the job for table lineage
+ */
+ void deleteSourceTableLineage(String job);
+
+ /**
+ * Get source table and job lineages.
+ *
+ * @param predicate the predicate for the table lineages
+ * @return the iterator for source table and job lineages
+ */
+ Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate
predicate);
+
+ /**
+ * Save the sink table and job lineage.
+ *
+ * @param entity the table lineage entity
+ */
+ void saveSinkTableLineage(TableLineageEntity entity);
+
+ /**
+ * Get sink table and job lineages.
+ *
+ * @param predicate the predicate for the table lineages
+ * @return the iterator for sink table and job lineages
+ */
+ Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate
predicate);
+
+ /**
+ * Delete the sink table lineage for given job.
+ *
+ * @param job the job for table lineage
+ */
+ void deleteSinkTableLineage(String job);
+
+ /**
+ * Save the source table and job lineage.
+ *
+ * @param entity the data lineage entity
+ */
+ void saveSourceDataLineage(DataLineageEntity entity);
+
+ /**
+ * Get source data and job lineages.
+ *
+ * @param predicate the predicate for the table lineages
+ * @return the iterator for source table and job lineages
+ */
+ Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate
predicate);
+
+ /**
+ * Save the sink table and job lineage.
+ *
+ * @param entity the data lineage entity
+ */
+ void saveSinkDataLineage(DataLineageEntity entity);
+
+ /**
+ * Get sink data and job lineages.
+ *
+ * @param predicate the predicate for the table lineages
+ * @return the iterator for sink table and job lineages
+ */
+ Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate
predicate);
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
new file mode 100644
index 000000000..5cb7f8bae
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link LineageMeta}. Each factory should have a unique
identifier. */
+public interface LineageMetaFactory extends Factory {
+
+ LineageMeta create(LineageMetaContext context);
+
+ /**
+ * Context has all options in a catalog and is used in factory to create
{@link LineageMeta}.
+ */
+ interface LineageMetaContext {
+ Options options();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
new file mode 100644
index 000000000..c4312c4eb
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.data.Timestamp;
+
+/** Table lineage entity with database, table and job for table source and
sink lineage. */
+public interface TableLineageEntity {
+ String getDatabase();
+
+ String getTable();
+
+ String getJob();
+
+ Timestamp getCreateTime();
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
new file mode 100644
index 000000000..ef11ee87f
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.data.Timestamp;
+
+/** Default implementation for {@link TableLineageEntity}. */
+public class TableLineageEntityImpl implements TableLineageEntity {
+ private final String database;
+ private final String table;
+ private final String job;
+ private final Timestamp timestamp;
+
+ public TableLineageEntityImpl(String database, String table, String job,
Timestamp timestamp) {
+ this.database = database;
+ this.table = table;
+ this.job = job;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getDatabase() {
+ return database;
+ }
+
+ @Override
+ public String getTable() {
+ return table;
+ }
+
+ @Override
+ public String getJob() {
+ return job;
+ }
+
+ @Override
+ public Timestamp getCreateTime() {
+ return timestamp;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index e853b67c5..0fba499dd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -18,6 +18,8 @@
package org.apache.paimon.options;
+import org.apache.paimon.options.description.Description;
+import org.apache.paimon.options.description.TextElement;
import org.apache.paimon.table.TableType;
import java.time.Duration;
@@ -75,4 +77,24 @@ public class CatalogOptions {
.defaultValue(true)
.withDescription(
"Allow to fallback to hadoop File IO when no file
io found for the scheme.");
+
+ public static final ConfigOption<String> LINEAGE_META =
+ key("lineage-meta")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The lineage meta to store table
and data lineage information.")
+ .linebreak()
+ .linebreak()
+ .text("Possible values:")
+ .linebreak()
+ .list(
+ TextElement.text(
+ "\"jdbc\": Use standard
jdbc to store table and data lineage information."))
+ .list(
+ TextElement.text(
+ "\"custom\": You can
implement LineageMetaFactory and LineageMeta to store lineage information in
customized storage."))
+ .build());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 38d3eef20..f958adda0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -19,10 +19,15 @@
package org.apache.paimon.catalog;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
@@ -30,11 +35,15 @@ import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.StringUtils;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
+
/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
@@ -46,13 +55,19 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
+ @Nullable protected final LineageMeta lineageMeta;
+
protected AbstractCatalog(FileIO fileIO) {
this.fileIO = fileIO;
+ this.lineageMeta = null;
this.tableDefaultOptions = new HashMap<>();
}
protected AbstractCatalog(FileIO fileIO, Map<String, String> options) {
this.fileIO = fileIO;
+ this.lineageMeta =
+ findAndCreateLineageMeta(
+ Options.fromMap(options),
AbstractCatalog.class.getClassLoader());
this.tableDefaultOptions = new HashMap<>();
options.keySet().stream()
@@ -64,6 +79,17 @@ public abstract class AbstractCatalog implements Catalog {
options.get(key)));
}
+ @Nullable
+ private LineageMeta findAndCreateLineageMeta(Options options, ClassLoader
classLoader) {
+ return options.getOptional(LINEAGE_META)
+ .map(
+ meta ->
+ FactoryUtil.discoverFactory(
+ classLoader,
LineageMetaFactory.class, meta)
+ .create(() -> options))
+ .orElse(null);
+ }
+
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
if (isSystemDatabase(identifier.getDatabaseName())) {
@@ -95,8 +121,10 @@ public abstract class AbstractCatalog implements Catalog {
fileIO,
getDataTableLocation(identifier),
tableSchema,
- Lock.factory(lockFactory().orElse(null), identifier),
- metastoreClientFactory(identifier).orElse(null));
+ new CatalogEnvironment(
+ Lock.factory(lockFactory().orElse(null), identifier),
+ metastoreClientFactory(identifier).orElse(null),
+ lineageMeta));
}
@VisibleForTesting
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 15bb81e61..597943b20 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -26,10 +26,8 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
-import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
-import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
@@ -57,8 +55,6 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
@@ -80,15 +76,13 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
protected final FileIO fileIO;
protected final Path path;
protected final TableSchema tableSchema;
- protected final Lock.Factory lockFactory;
- @Nullable protected final MetastoreClient.Factory metastoreClientFactory;
+ protected final CatalogEnvironment catalogEnvironment;
public AbstractFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
- Lock.Factory lockFactory,
- @Nullable MetastoreClient.Factory metastoreClientFactory) {
+ CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.path = path;
if (!tableSchema.options().containsKey(PATH.key())) {
@@ -98,8 +92,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
tableSchema = tableSchema.copy(newOptions);
}
this.tableSchema = tableSchema;
- this.lockFactory = lockFactory;
- this.metastoreClientFactory = metastoreClientFactory;
+ this.catalogEnvironment = catalogEnvironment;
}
@Override
@@ -107,6 +100,11 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
return store().bucketMode();
}
+ @Override
+ public CatalogEnvironment catalogEnvironment() {
+ return catalogEnvironment;
+ }
+
public RowKeyExtractor createRowKeyExtractor() {
switch (bucketMode()) {
case FIXED:
@@ -263,15 +261,18 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
coreOptions().writeOnly() ? null : store().newExpire(),
coreOptions().writeOnly() ? null :
store().newPartitionExpire(commitUser),
coreOptions().writeOnly() ? null :
store().newTagCreationManager(),
- lockFactory.create(),
+ catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path));
}
private List<CommitCallback> createCommitCallbacks() {
List<CommitCallback> callbacks = new
ArrayList<>(loadCommitCallbacks());
- if (coreOptions().partitionedTableInMetastore() &&
metastoreClientFactory != null) {
- callbacks.add(new
AddPartitionCommitCallback(metastoreClientFactory.create()));
+ if (coreOptions().partitionedTableInMetastore()
+ && catalogEnvironment.metastoreClientFactory() != null) {
+ callbacks.add(
+ new AddPartitionCommitCallback(
+
catalogEnvironment.metastoreClientFactory().create()));
}
return callbacks;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 787dbeff1..630a100e2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -26,7 +26,6 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
-import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -45,8 +44,6 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Preconditions;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.function.BiConsumer;
@@ -58,22 +55,20 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
private transient AppendOnlyFileStore lazyStore;
AppendOnlyFileStoreTable(FileIO fileIO, Path path, TableSchema
tableSchema) {
- this(fileIO, path, tableSchema, Lock.emptyFactory(), null);
+ this(fileIO, path, tableSchema, new
CatalogEnvironment(Lock.emptyFactory(), null, null));
}
AppendOnlyFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
- Lock.Factory lockFactory,
- @Nullable MetastoreClient.Factory metastoreClientFactory) {
- super(fileIO, path, tableSchema, lockFactory, metastoreClientFactory);
+ CatalogEnvironment catalogEnvironment) {
+ super(fileIO, path, tableSchema, catalogEnvironment);
}
@Override
protected FileStoreTable copy(TableSchema newTableSchema) {
- return new AppendOnlyFileStoreTable(
- fileIO, path, newTableSchema, lockFactory,
metastoreClientFactory);
+ return new AppendOnlyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
new file mode 100644
index 000000000..3fdaea2da
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.operation.Lock;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * Catalog environment in table which contains log factory, metastore client
factory and lineage
+ * meta.
+ */
+public class CatalogEnvironment implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Lock.Factory lockFactory;
+ @Nullable private final MetastoreClient.Factory metastoreClientFactory;
+ @Nullable private final LineageMeta lineageMeta;
+
+ public CatalogEnvironment(
+ Lock.Factory lockFactory,
+ @Nullable MetastoreClient.Factory metastoreClientFactory,
+ @Nullable LineageMeta lineageMeta) {
+ this.lockFactory = lockFactory;
+ this.metastoreClientFactory = metastoreClientFactory;
+ this.lineageMeta = lineageMeta;
+ }
+
+ public Lock.Factory lockFactory() {
+ return lockFactory;
+ }
+
+ @Nullable
+ public MetastoreClient.Factory metastoreClientFactory() {
+ return metastoreClientFactory;
+ }
+
+ @Nullable
+ public LineageMeta lineageMeta() {
+ return lineageMeta;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index d515c5d7d..035816e1c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -29,7 +29,6 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.ValueCountMergeFunction;
-import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.Lock;
@@ -49,8 +48,6 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
-import javax.annotation.Nullable;
-
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
@@ -64,22 +61,21 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
private transient KeyValueFileStore lazyStore;
ChangelogValueCountFileStoreTable(FileIO fileIO, Path path, TableSchema
tableSchema) {
- this(fileIO, path, tableSchema, Lock.emptyFactory(), null);
+ this(fileIO, path, tableSchema, new
CatalogEnvironment(Lock.emptyFactory(), null, null));
}
ChangelogValueCountFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
- Lock.Factory lockFactory,
- @Nullable MetastoreClient.Factory metastoreClientFactory) {
- super(fileIO, path, tableSchema, lockFactory, metastoreClientFactory);
+ CatalogEnvironment catalogEnvironment) {
+ super(fileIO, path, tableSchema, catalogEnvironment);
}
@Override
protected FileStoreTable copy(TableSchema newTableSchema) {
return new ChangelogValueCountFileStoreTable(
- fileIO, path, newTableSchema, lockFactory,
metastoreClientFactory);
+ fileIO, path, newTableSchema, catalogEnvironment);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 8c764bca4..624d9fc41 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -33,7 +33,6 @@ import
org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
-import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.Lock;
@@ -52,8 +51,6 @@ import
org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
-import javax.annotation.Nullable;
-
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -71,22 +68,20 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
private transient KeyValueFileStore lazyStore;
ChangelogWithKeyFileStoreTable(FileIO fileIO, Path path, TableSchema
tableSchema) {
- this(fileIO, path, tableSchema, Lock.emptyFactory(), null);
+ this(fileIO, path, tableSchema, new
CatalogEnvironment(Lock.emptyFactory(), null, null));
}
ChangelogWithKeyFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
- Lock.Factory lockFactory,
- @Nullable MetastoreClient.Factory metastoreClientFactory) {
- super(fileIO, path, tableSchema, lockFactory, metastoreClientFactory);
+ CatalogEnvironment catalogEnvironment) {
+ super(fileIO, path, tableSchema, catalogEnvironment);
}
@Override
protected FileStoreTable copy(TableSchema newTableSchema) {
- return new ChangelogWithKeyFileStoreTable(
- fileIO, path, newTableSchema, lockFactory,
metastoreClientFactory);
+ return new ChangelogWithKeyFileStoreTable(fileIO, path,
newTableSchema, catalogEnvironment);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 5206c00c6..860bb91be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -74,6 +74,8 @@ public interface FileStoreTable extends DataTable {
BucketMode bucketMode();
+ CatalogEnvironment catalogEnvironment();
+
@Override
FileStoreTable copy(Map<String, String> dynamicOptions);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 7427d1549..9f58a19c5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -23,14 +23,11 @@ import org.apache.paimon.WriteMode;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -66,21 +63,29 @@ public class FileStoreTableFactory {
"Schema file not found in
location "
+ tablePath
+ ". Please create
table first."));
- return create(fileIO, tablePath, tableSchema, options,
Lock.emptyFactory(), null);
+ return create(
+ fileIO,
+ tablePath,
+ tableSchema,
+ options,
+ new CatalogEnvironment(Lock.emptyFactory(), null, null));
}
public static FileStoreTable create(FileIO fileIO, Path tablePath,
TableSchema tableSchema) {
- return create(fileIO, tablePath, tableSchema, new Options(),
Lock.emptyFactory(), null);
+ return create(
+ fileIO,
+ tablePath,
+ tableSchema,
+ new Options(),
+ new CatalogEnvironment(Lock.emptyFactory(), null, null));
}
public static FileStoreTable create(
FileIO fileIO,
Path tablePath,
TableSchema tableSchema,
- Lock.Factory lockFactory,
- @Nullable MetastoreClient.Factory metastoreClientFactory) {
- return create(
- fileIO, tablePath, tableSchema, new Options(), lockFactory,
metastoreClientFactory);
+ CatalogEnvironment catalogEnvironment) {
+ return create(fileIO, tablePath, tableSchema, new Options(),
catalogEnvironment);
}
public static FileStoreTable create(
@@ -88,8 +93,7 @@ public class FileStoreTableFactory {
Path tablePath,
TableSchema tableSchema,
Options dynamicOptions,
- Lock.Factory lockFactory,
- @Nullable MetastoreClient.Factory metastoreClientFactory) {
+ CatalogEnvironment catalogEnvironment) {
FileStoreTable table;
Options coreOptions = Options.fromMap(tableSchema.options());
WriteMode writeMode = coreOptions.get(CoreOptions.WRITE_MODE);
@@ -103,24 +107,16 @@ public class FileStoreTableFactory {
if (writeMode == WriteMode.APPEND_ONLY) {
table =
new AppendOnlyFileStoreTable(
- fileIO, tablePath, tableSchema, lockFactory,
metastoreClientFactory);
+ fileIO, tablePath, tableSchema,
catalogEnvironment);
} else {
if (tableSchema.primaryKeys().isEmpty()) {
table =
new ChangelogValueCountFileStoreTable(
- fileIO,
- tablePath,
- tableSchema,
- lockFactory,
- metastoreClientFactory);
+ fileIO, tablePath, tableSchema,
catalogEnvironment);
} else {
table =
new ChangelogWithKeyFileStoreTable(
- fileIO,
- tablePath,
- tableSchema,
- lockFactory,
- metastoreClientFactory);
+ fileIO, tablePath, tableSchema,
catalogEnvironment);
}
}
return table.copy(dynamicOptions.toMap());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index fff72a787..d6847cb34 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
@@ -112,8 +113,7 @@ public class TableCommitTest {
new FailingFileIO(),
new Path(path),
tableSchema,
- Lock.emptyFactory(),
- null);
+ new CatalogEnvironment(Lock.emptyFactory(), null,
null));
String commitUser = UUID.randomUUID().toString();
StreamTableWrite write = table.newWrite(commitUser);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
index 9b498547d..e2b5d7362 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
@@ -144,6 +145,10 @@ public class ContinuousAppendAndCompactFollowUpScannerTest
extends ScannerTestBa
conf.toMap(),
""));
return FileStoreTableFactory.create(
- fileIO, tablePath, tableSchema, conf, Lock.emptyFactory(),
null);
+ fileIO,
+ tablePath,
+ tableSchema,
+ conf,
+ new CatalogEnvironment(Lock.emptyFactory(), null, null));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index 04eea4393..df1d2b533 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -33,6 +33,7 @@ import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.DataSplit;
@@ -138,7 +139,11 @@ public abstract class ScannerTestBase {
conf.toMap(),
""));
return FileStoreTableFactory.create(
- fileIO, tablePath, tableSchema, conf, Lock.emptyFactory(),
null);
+ fileIO,
+ tablePath,
+ tableSchema,
+ conf,
+ new CatalogEnvironment(Lock.emptyFactory(), null, null));
}
protected List<Split> toSplits(List<DataSplit> dataSplits) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 4ac6d542a..a9d72f003 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.LogConsistency;
import org.apache.paimon.CoreOptions.StreamingReadMode;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
@@ -30,8 +31,12 @@ import org.apache.paimon.flink.source.SystemTableSource;
import org.apache.paimon.flink.source.table.PushedRichTableSource;
import org.apache.paimon.flink.source.table.PushedTableSource;
import org.apache.paimon.flink.source.table.RichTableSource;
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.TableLineageEntity;
+import org.apache.paimon.lineage.TableLineageEntityImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
@@ -39,6 +44,7 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -48,12 +54,15 @@ import
org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.BiConsumer;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
@@ -82,6 +91,12 @@ public abstract class AbstractFlinkTableFactory
new SystemTableSource(((SystemCatalogTable)
origin).table(), isStreamingMode));
} else {
Table table = buildPaimonTable(context);
+ if (table instanceof FileStoreTable) {
+ storeTableLineage(
+ ((FileStoreTable)
table).catalogEnvironment().lineageMeta(),
+ context,
+ (entity, lineage) ->
lineage.saveSourceTableLineage(entity));
+ }
DataTableSource source =
new DataTableSource(
context.getObjectIdentifier(),
@@ -97,13 +112,39 @@ public abstract class AbstractFlinkTableFactory
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
+ Table table = buildPaimonTable(context);
+ if (table instanceof FileStoreTable) {
+ storeTableLineage(
+ ((FileStoreTable)
table).catalogEnvironment().lineageMeta(),
+ context,
+ (entity, lineage) -> lineage.saveSinkTableLineage(entity));
+ }
return new FlinkTableSink(
context.getObjectIdentifier(),
- buildPaimonTable(context),
+ table,
context,
createOptionalLogStoreFactory(context).orElse(null));
}
+ private void storeTableLineage(
+ @Nullable LineageMeta lineageMeta,
+ Context context,
+ BiConsumer<TableLineageEntity, LineageMeta> tableLineage) {
+ if (lineageMeta != null) {
+ String pipelineName =
context.getConfiguration().get(PipelineOptions.NAME);
+ if (pipelineName == null) {
+ throw new ValidationException("Cannot get pipeline name for
lineage meta.");
+ }
+ tableLineage.accept(
+ new TableLineageEntityImpl(
+ context.getObjectIdentifier().getDatabaseName(),
+ context.getObjectIdentifier().getObjectName(),
+ pipelineName,
+
Timestamp.fromEpochMillis(System.currentTimeMillis())),
+ lineageMeta);
+ }
+ }
+
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index d49afc681..027f21716 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -51,7 +51,10 @@ import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
@@ -69,14 +72,17 @@ public abstract class CatalogITCaseBase extends
AbstractTestBase {
path = getTempDirPath();
String inferScan =
!inferScanParallelism() ?
",\n'table-default.scan.infer-parallelism'='false'" : "";
+
+ Map<String, String> options = new HashMap<>(catalogOptions());
+ options.put("type", "paimon");
+ options.put("warehouse", toWarehouse(path));
tEnv.executeSql(
String.format(
- "CREATE CATALOG %s WITH ("
- + "'type'='paimon', 'warehouse'='%s'"
- + inferScan
- + ")",
+ "CREATE CATALOG %s WITH (" + "%s" + inferScan + ")",
catalog,
- toWarehouse(path)));
+ options.entrySet().stream()
+ .map(e -> String.format("'%s'='%s'",
e.getKey(), e.getValue()))
+ .collect(Collectors.joining(","))));
tEnv.useCatalog(catalog);
sEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
@@ -88,6 +94,10 @@ public abstract class CatalogITCaseBase extends
AbstractTestBase {
prepareEnv();
}
+ protected Map<String, String> catalogOptions() {
+ return Collections.emptyMap();
+ }
+
protected boolean inferScanParallelism() {
return false;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
new file mode 100644
index 000000000..e31a73e6c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.lineage.DataLineageEntity;
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.lineage.TableLineageEntity;
+import org.apache.paimon.predicate.Predicate;
+
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.table.api.ValidationException;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** ITCase for flink table and data lineage. */
+public class FlinkLineageITCase extends CatalogITCaseBase {
+ private static final String THROWING_META = "throwing-meta";
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT,
b INT, c INT)");
+ }
+
+ @Override
+ protected Map<String, String> catalogOptions() {
+ return Collections.singletonMap(LINEAGE_META.key(), THROWING_META);
+ }
+
+ @Test
+ public void testTableLineage() {
+ // Validate for source and sink lineage when pipeline name is null
+ assertThatThrownBy(
+ () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2,
3),(4, 5, 6);").await())
+ .hasCauseExactlyInstanceOf(ValidationException.class)
+ .hasRootCauseMessage("Cannot get pipeline name for lineage
meta.");
+ assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM
T").collect().close())
+ .hasCauseExactlyInstanceOf(ValidationException.class)
+ .hasRootCauseMessage("Cannot get pipeline name for lineage
meta.");
+
+ // Call storeSinkTableLineage and storeSourceTableLineage methods
+ tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME,
"insert_t_job");
+ assertThatThrownBy(
+ () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2,
3),(4, 5, 6);").await())
+ .hasCauseExactlyInstanceOf(UnsupportedOperationException.class)
+ .hasRootCauseMessage("Method saveSinkTableLineage is not
supported");
+
+ tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME,
"select_t_job");
+ assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM
T").collect().close())
+ .hasCauseExactlyInstanceOf(UnsupportedOperationException.class)
+ .hasRootCauseMessage("Method saveSourceTableLineage is not
supported");
+ }
+
+ /** Factory to create throwing lineage meta. */
+ public static class ThrowingLineageMetaFactory implements
LineageMetaFactory {
+ @Override
+ public String identifier() {
+ return THROWING_META;
+ }
+
+ @Override
+ public LineageMeta create(LineageMetaContext context) {
+ return new ThrowingLineageMeta();
+ }
+ }
+
+ /** Throwing specific exception in each method. */
+ private static class ThrowingLineageMeta implements LineageMeta {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void saveSourceTableLineage(TableLineageEntity entity) {
+ throw new UnsupportedOperationException(
+ "Method saveSourceTableLineage is not supported");
+ }
+
+ @Override
+ public void deleteSourceTableLineage(String job) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<TableLineageEntity> sourceTableLineages(@Nullable
Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void saveSinkTableLineage(TableLineageEntity entity) {
+ assertEquals("insert_t_job", entity.getJob());
+ assertEquals("T", entity.getTable());
+ assertEquals("default", entity.getDatabase());
+ throw new UnsupportedOperationException("Method
saveSinkTableLineage is not supported");
+ }
+
+ @Override
+ public Iterator<TableLineageEntity> sinkTableLineages(@Nullable
Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteSinkTableLineage(String job) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void saveSourceDataLineage(DataLineageEntity entity) {
+ assertEquals("select_t_job", entity.getJob());
+ assertEquals("T", entity.getTable());
+ assertEquals("default", entity.getDatabase());
+ throw new UnsupportedOperationException("Method
saveSinkTableLineage is not supported");
+ }
+
+ @Override
+ public Iterator<DataLineageEntity> sourceDataLineages(@Nullable
Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void saveSinkDataLineage(DataLineageEntity entity) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<DataLineageEntity> sinkDataLineages(@Nullable
Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index 014b45dc6..eb5278738 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
@@ -131,7 +132,6 @@ public class FlinkSinkTest {
tablePath,
tableSchema,
conf,
- Lock.emptyFactory(),
- null);
+ new CatalogEnvironment(Lock.emptyFactory(), null, null));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6b51f027e..71135fa1e 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,4 +16,7 @@
org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory
org.apache.paimon.flink.action.cdc.mysql.TestAlterTableCatalogFactory
-org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
\ No newline at end of file
+org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
+
+# Lineage meta factory
+org.apache.paimon.flink.FlinkLineageITCase$ThrowingLineageMetaFactory
\ No newline at end of file