This is an automated email from the ASF dual-hosted git repository.
liming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 475e48791b [core] Remove all lineage implementation (#4607)
475e48791b is described below
commit 475e48791b873c516c4c26774cda3b45a268cd70
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Nov 29 20:38:47 2024 +0800
[core] Remove all lineage implementation (#4607)
---
.../generated/catalog_configuration.html | 6 -
.../java/org/apache/paimon/factories/Factory.java | 2 +-
.../apache/paimon/lineage/DataLineageEntity.java | 33 ----
.../org/apache/paimon/lineage/LineageMeta.java | 102 ----------
.../apache/paimon/lineage/LineageMetaFactory.java | 37 ----
.../apache/paimon/lineage/TableLineageEntity.java | 32 ----
.../paimon/lineage/TableLineageEntityImpl.java | 56 ------
.../org/apache/paimon/options/CatalogOptions.java | 22 ---
.../org/apache/paimon/catalog/AbstractCatalog.java | 26 +--
.../apache/paimon/table/CatalogEnvironment.java | 18 +-
.../paimon/table/system/SinkTableLineageTable.java | 64 -------
.../table/system/SourceTableLineageTable.java | 64 -------
.../paimon/table/system/SystemTableLoader.java | 29 +--
.../paimon/table/system/TableLineageTable.java | 168 -----------------
.../paimon/flink/AbstractFlinkTableFactory.java | 58 +-----
.../apache/paimon/flink/CatalogTableITCase.java | 6 +-
.../apache/paimon/flink/FlinkLineageITCase.java | 206 ---------------------
.../services/org.apache.paimon.factories.Factory | 3 -
.../java/org/apache/paimon/hive/HiveCatalog.java | 3 +-
19 files changed, 12 insertions(+), 923 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 6706d5c421..63f7adda1e 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -86,12 +86,6 @@ under the License.
<td>Boolean</td>
<td>Whether to support format tables, format table corresponds to
a regular csv, parquet or orc table, allowing read and write operations.
However, during these processes, it does not connect to the metastore; hence,
newly added partitions will not be reflected in the metastore and need to be
manually added as separate partition operations.</td>
</tr>
- <tr>
- <td><h5>lineage-meta</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>The lineage meta to store table and data lineage
information.<br /><br />Possible values:<br /><ul><li>"jdbc": Use standard jdbc
to store table and data lineage information.</li></ul><ul><li>"custom": You can
implement LineageMetaFactory and LineageMeta to store lineage information in
customized storage.</li></ul></td>
- </tr>
<tr>
<td><h5>lock-acquire-timeout</h5></td>
<td style="word-wrap: break-word;">8 min</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
index b0f1ec84c1..74796879ef 100644
--- a/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
+++ b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
@@ -20,7 +20,7 @@ package org.apache.paimon.factories;
/**
* Base interface for all kind of factories that create object instances from
a list of key-value
- * pairs in Paimon's catalog, lineage.
+ * pairs in Paimon's catalog.
*
* <p>A factory is uniquely identified by {@link Class} and {@link
#identifier()}.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
deleted file mode 100644
index e7401a9be3..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.data.Timestamp;
-
-/**
- * Data lineage entity with table lineage, barrier id and snapshot id for
table source and sink
- * lineage.
- */
-public interface DataLineageEntity extends TableLineageEntity {
- long getBarrierId();
-
- long getSnapshotId();
-
- Timestamp getCreateTime();
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
deleted file mode 100644
index 5d1c42daf6..0000000000
--- a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.predicate.Predicate;
-
-import javax.annotation.Nullable;
-
-import java.util.Iterator;
-
-/** Metadata store will manage table lineage and data lineage information for
the catalog. */
-public interface LineageMeta extends AutoCloseable {
- /**
- * Save the source table and job lineage.
- *
- * @param entity the table lineage entity
- */
- void saveSourceTableLineage(TableLineageEntity entity);
-
- /**
- * Delete the source table lineage for given job.
- *
- * @param job the job for table lineage
- */
- void deleteSourceTableLineage(String job);
-
- /**
- * Get source table and job lineages.
- *
- * @param predicate the predicate for the table lineages
- * @return the iterator for source table and job lineages
- */
- Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate
predicate);
-
- /**
- * Save the sink table and job lineage.
- *
- * @param entity the table lineage entity
- */
- void saveSinkTableLineage(TableLineageEntity entity);
-
- /**
- * Get sink table and job lineages.
- *
- * @param predicate the predicate for the table lineages
- * @return the iterator for sink table and job lineages
- */
- Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate
predicate);
-
- /**
- * Delete the sink table lineage for given job.
- *
- * @param job the job for table lineage
- */
- void deleteSinkTableLineage(String job);
-
- /**
- * Save the source table and job lineage.
- *
- * @param entity the data lineage entity
- */
- void saveSourceDataLineage(DataLineageEntity entity);
-
- /**
- * Get source data and job lineages.
- *
- * @param predicate the predicate for the table lineages
- * @return the iterator for source table and job lineages
- */
- Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate
predicate);
-
- /**
- * Save the sink table and job lineage.
- *
- * @param entity the data lineage entity
- */
- void saveSinkDataLineage(DataLineageEntity entity);
-
- /**
- * Get sink data and job lineages.
- *
- * @param predicate the predicate for the table lineages
- * @return the iterator for sink table and job lineages
- */
- Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate
predicate);
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
deleted file mode 100644
index 11c6d3a117..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.factories.Factory;
-import org.apache.paimon.options.Options;
-
-import java.io.Serializable;
-
-/** Factory to create {@link LineageMeta}. Each factory should have a unique
identifier. */
-public interface LineageMetaFactory extends Factory, Serializable {
-
- LineageMeta create(LineageMetaContext context);
-
- /**
- * Context has all options in a catalog and is used in factory to create
{@link LineageMeta}.
- */
- interface LineageMetaContext {
- Options options();
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
deleted file mode 100644
index c4312c4eb0..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.data.Timestamp;
-
-/** Table lineage entity with database, table and job for table source and
sink lineage. */
-public interface TableLineageEntity {
- String getDatabase();
-
- String getTable();
-
- String getJob();
-
- Timestamp getCreateTime();
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
deleted file mode 100644
index ef11ee87f1..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.data.Timestamp;
-
-/** Default implementation for {@link TableLineageEntity}. */
-public class TableLineageEntityImpl implements TableLineageEntity {
- private final String database;
- private final String table;
- private final String job;
- private final Timestamp timestamp;
-
- public TableLineageEntityImpl(String database, String table, String job,
Timestamp timestamp) {
- this.database = database;
- this.table = table;
- this.job = job;
- this.timestamp = timestamp;
- }
-
- @Override
- public String getDatabase() {
- return database;
- }
-
- @Override
- public String getTable() {
- return table;
- }
-
- @Override
- public String getJob() {
- return job;
- }
-
- @Override
- public Timestamp getCreateTime() {
- return timestamp;
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index f69af2d599..bb8cfae682 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -18,8 +18,6 @@
package org.apache.paimon.options;
-import org.apache.paimon.options.description.Description;
-import org.apache.paimon.options.description.TextElement;
import org.apache.paimon.table.CatalogTableType;
import java.time.Duration;
@@ -130,26 +128,6 @@ public class CatalogOptions {
.withDescription(
"Controls the max number for snapshots per table
in the catalog are cached.");
- public static final ConfigOption<String> LINEAGE_META =
- key("lineage-meta")
- .stringType()
- .noDefaultValue()
- .withDescription(
- Description.builder()
- .text(
- "The lineage meta to store table
and data lineage information.")
- .linebreak()
- .linebreak()
- .text("Possible values:")
- .linebreak()
- .list(
- TextElement.text(
- "\"jdbc\": Use standard
jdbc to store table and data lineage information."))
- .list(
- TextElement.text(
- "\"custom\": You can
implement LineageMetaFactory and LineageMeta to store lineage information in
customized storage."))
- .build());
-
public static final ConfigOption<Boolean> ALLOW_UPPER_CASE =
ConfigOptions.key("allow-upper-case")
.booleanType()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 16b76513d7..2b277a29b8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -24,7 +24,6 @@ import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommit;
@@ -62,7 +61,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
-import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
@@ -76,19 +74,14 @@ public abstract class AbstractCatalog implements Catalog {
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
- @Nullable protected final LineageMetaFactory lineageMetaFactory;
-
protected AbstractCatalog(FileIO fileIO) {
this.fileIO = fileIO;
- this.lineageMetaFactory = null;
this.tableDefaultOptions = new HashMap<>();
this.catalogOptions = new Options();
}
protected AbstractCatalog(FileIO fileIO, Options options) {
this.fileIO = fileIO;
- this.lineageMetaFactory =
- findAndCreateLineageMeta(options,
AbstractCatalog.class.getClassLoader());
this.tableDefaultOptions =
Catalog.tableDefaultOptions(options.toMap());
this.catalogOptions = options;
}
@@ -377,27 +370,13 @@ public abstract class AbstractCatalog implements Catalog {
protected abstract void alterTableImpl(Identifier identifier,
List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException;
- @Nullable
- private LineageMetaFactory findAndCreateLineageMeta(Options options,
ClassLoader classLoader) {
- return options.getOptional(LINEAGE_META)
- .map(
- meta ->
- FactoryUtil.discoverFactory(
- classLoader, LineageMetaFactory.class,
meta))
- .orElse(null);
- }
-
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
if (isSystemDatabase(identifier.getDatabaseName())) {
String tableName = identifier.getTableName();
Table table =
SystemTableLoader.loadGlobal(
- tableName,
- fileIO,
- this::allTablePaths,
- catalogOptions,
- lineageMetaFactory);
+ tableName, fileIO, this::allTablePaths,
catalogOptions);
if (table == null) {
throw new TableNotExistException(identifier);
}
@@ -444,8 +423,7 @@ public abstract class AbstractCatalog implements Catalog {
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
- metastoreClientFactory(identifier,
tableMeta.schema).orElse(null),
- lineageMetaFactory));
+ metastoreClientFactory(identifier,
tableMeta.schema).orElse(null)));
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index 9ff5f9b4f6..a722d9e21a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -19,7 +19,6 @@
package org.apache.paimon.table;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.Lock;
@@ -27,10 +26,7 @@ import javax.annotation.Nullable;
import java.io.Serializable;
-/**
- * Catalog environment in table which contains log factory, metastore client
factory and lineage
- * meta.
- */
+/** Catalog environment in table which contains log factory, metastore client
factory. */
public class CatalogEnvironment implements Serializable {
private static final long serialVersionUID = 1L;
@@ -39,23 +35,20 @@ public class CatalogEnvironment implements Serializable {
@Nullable private final String uuid;
private final Lock.Factory lockFactory;
@Nullable private final MetastoreClient.Factory metastoreClientFactory;
- @Nullable private final LineageMetaFactory lineageMetaFactory;
public CatalogEnvironment(
@Nullable Identifier identifier,
@Nullable String uuid,
Lock.Factory lockFactory,
- @Nullable MetastoreClient.Factory metastoreClientFactory,
- @Nullable LineageMetaFactory lineageMetaFactory) {
+ @Nullable MetastoreClient.Factory metastoreClientFactory) {
this.identifier = identifier;
this.uuid = uuid;
this.lockFactory = lockFactory;
this.metastoreClientFactory = metastoreClientFactory;
- this.lineageMetaFactory = lineageMetaFactory;
}
public static CatalogEnvironment empty() {
- return new CatalogEnvironment(null, null, Lock.emptyFactory(), null,
null);
+ return new CatalogEnvironment(null, null, Lock.emptyFactory(), null);
}
@Nullable
@@ -76,9 +69,4 @@ public class CatalogEnvironment implements Serializable {
public MetastoreClient.Factory metastoreClientFactory() {
return metastoreClientFactory;
}
-
- @Nullable
- public LineageMetaFactory lineageMetaFactory() {
- return lineageMetaFactory;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
deleted file mode 100644
index 71efce0704..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.system;
-
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerTableRead;
-
-import java.util.Map;
-
-/**
- * This is a system table to display all the sink table lineages.
- *
- * <pre>
- * For example:
- * If we select * from sys.sink_table_lineage, we will get
- * database_name table_name job_name create_time
- * default test0 job1 2023-10-22 20:35:12
- * database1 test1 job1 2023-10-28 21:35:52
- * ... ... ... ...
- * We can write sql to fetch the information we need.
- * </pre>
- */
-public class SinkTableLineageTable extends TableLineageTable {
-
- public static final String SINK_TABLE_LINEAGE = "sink_table_lineage";
-
- public SinkTableLineageTable(LineageMetaFactory lineageMetaFactory,
Options options) {
- super(lineageMetaFactory, options);
- }
-
- @Override
- public InnerTableRead newRead() {
- return new TableLineageRead(lineageMetaFactory, options,
LineageMeta::sinkTableLineages);
- }
-
- @Override
- public String name() {
- return SINK_TABLE_LINEAGE;
- }
-
- @Override
- public Table copy(Map<String, String> dynamicOptions) {
- return new SinkTableLineageTable(lineageMetaFactory, options);
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
deleted file mode 100644
index 5d9904fa66..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.system;
-
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerTableRead;
-
-import java.util.Map;
-
-/**
- * This is a system table to display all the source table lineages.
- *
- * <pre>
- * For example:
- * If we select * from sys.source_table_lineage, we will get
- * database_name table_name job_name create_time
- * default test0 job1 2023-10-22 20:35:12
- * database1 test1 job1 2023-10-28 21:35:52
- * ... ... ... ...
- * We can write sql to fetch the information we need.
- * </pre>
- */
-public class SourceTableLineageTable extends TableLineageTable {
-
- public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";
-
- public SourceTableLineageTable(LineageMetaFactory lineageMetaFactory,
Options options) {
- super(lineageMetaFactory, options);
- }
-
- @Override
- public InnerTableRead newRead() {
- return new TableLineageRead(lineageMetaFactory, options,
LineageMeta::sourceTableLineages);
- }
-
- @Override
- public String name() {
- return SOURCE_TABLE_LINEAGE;
- }
-
- @Override
- public Table copy(Map<String, String> dynamicOptions) {
- return new SourceTableLineageTable(lineageMetaFactory, options);
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 3d5b211316..763e4d1216 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.system;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -37,7 +36,6 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
-import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static
org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION_FIELDS;
import static
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
@@ -52,12 +50,9 @@ import static
org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
-import static
org.apache.paimon.table.system.SinkTableLineageTable.SINK_TABLE_LINEAGE;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
-import static
org.apache.paimon.table.system.SourceTableLineageTable.SOURCE_TABLE_LINEAGE;
import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
import static org.apache.paimon.table.system.TagsTable.TAGS;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Loader to load system {@link Table}s. */
public class SystemTableLoader {
@@ -95,38 +90,18 @@ public class SystemTableLoader {
String tableName,
FileIO fileIO,
Supplier<Map<String, Map<String, Path>>> allTablePaths,
- Options catalogOptions,
- @Nullable LineageMetaFactory lineageMetaFactory) {
+ Options catalogOptions) {
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
return new AllTableOptionsTable(fileIO, allTablePaths.get());
case CATALOG_OPTIONS:
return new CatalogOptionsTable(catalogOptions);
- case SOURCE_TABLE_LINEAGE:
- {
- checkNotNull(
- lineageMetaFactory,
- String.format(
- "Lineage meta should be configured for
catalog with %s",
- LINEAGE_META.key()));
- return new SourceTableLineageTable(lineageMetaFactory,
catalogOptions);
- }
- case SINK_TABLE_LINEAGE:
- {
- checkNotNull(
- lineageMetaFactory,
- String.format(
- "Lineage meta should be configured for
catalog with %s",
- LINEAGE_META.key()));
- return new SinkTableLineageTable(lineageMetaFactory,
catalogOptions);
- }
default:
return null;
}
}
public static List<String> loadGlobalTableNames() {
- return Arrays.asList(
- ALL_TABLE_OPTIONS, CATALOG_OPTIONS, SOURCE_TABLE_LINEAGE,
SINK_TABLE_LINEAGE);
+ return Arrays.asList(ALL_TABLE_OPTIONS, CATALOG_OPTIONS);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
deleted file mode 100644
index aeaf3ca3b1..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.system;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.ReadonlyTable;
-import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.ReadOnceTableScan;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.IteratorRecordReader;
-import org.apache.paimon.utils.ProjectedRow;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.BiFunction;
-
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-
-/** Base lineage table for source and sink table lineage. */
-public abstract class TableLineageTable implements ReadonlyTable {
- protected final LineageMetaFactory lineageMetaFactory;
- protected final Options options;
-
- public static final RowType TABLE_TYPE =
- new RowType(
- Arrays.asList(
- new DataField(
- 0, "database_name", new
VarCharType(VarCharType.MAX_LENGTH)),
- new DataField(1, "table_name", new
VarCharType(VarCharType.MAX_LENGTH)),
- new DataField(2, "job_name", new
VarCharType(VarCharType.MAX_LENGTH)),
- new DataField(3, "create_time", new
TimestampType())));
-
- protected TableLineageTable(LineageMetaFactory lineageMetaFactory, Options
options) {
- this.lineageMetaFactory = lineageMetaFactory;
- this.options = options;
- }
-
- @Override
- public InnerTableScan newScan() {
- return new ReadOnceTableScan() {
- @Override
- public InnerTableScan withFilter(Predicate predicate) {
- return this;
- }
-
- @Override
- protected Plan innerPlan() {
- /// TODO get the real row count for plan.
- return () -> Collections.singletonList((Split) () -> 1L);
- }
- };
- }
-
- @Override
- public RowType rowType() {
- return TABLE_TYPE;
- }
-
- @Override
- public List<String> primaryKeys() {
- return Arrays.asList("database_name", "table_name", "job_name");
- }
-
- /** Table lineage read with lineage meta query. */
- protected static class TableLineageRead implements InnerTableRead {
- private final LineageMetaFactory lineageMetaFactory;
- private final Options options;
- private final BiFunction<LineageMeta, Predicate,
Iterator<TableLineageEntity>>
- tableLineageQuery;
- @Nullable private Predicate predicate;
- private RowType readType;
-
- protected TableLineageRead(
- LineageMetaFactory lineageMetaFactory,
- Options options,
- BiFunction<LineageMeta, Predicate,
Iterator<TableLineageEntity>>
- tableLineageQuery) {
- this.lineageMetaFactory = lineageMetaFactory;
- this.options = options;
- this.tableLineageQuery = tableLineageQuery;
- this.predicate = null;
- }
-
- @Override
- public InnerTableRead withFilter(Predicate predicate) {
- this.predicate = predicate;
- return this;
- }
-
- @Override
- public InnerTableRead withReadType(RowType readType) {
- this.readType = readType;
- return this;
- }
-
- @Override
- public TableRead withIOManager(IOManager ioManager) {
- return this;
- }
-
- @Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- try (LineageMeta lineageMeta = lineageMetaFactory.create(() ->
options)) {
- Iterator<TableLineageEntity> sourceTableLineages =
- tableLineageQuery.apply(lineageMeta, predicate);
- return new IteratorRecordReader<>(
- Iterators.transform(
- sourceTableLineages,
- entity -> {
- checkNotNull(entity);
- GenericRow row =
- GenericRow.of(
-
BinaryString.fromString(entity.getDatabase()),
-
BinaryString.fromString(entity.getTable()),
-
BinaryString.fromString(entity.getJob()),
- entity.getCreateTime());
- if (readType != null) {
- return ProjectedRow.from(
- readType,
TableLineageTable.TABLE_TYPE)
- .replaceRow(row);
- } else {
- return row;
- }
- }));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 9f90a2cd01..6b10dbb84b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -25,15 +25,10 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.lineage.TableLineageEntityImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;
@@ -47,7 +42,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;
@@ -71,7 +65,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
@@ -109,23 +102,9 @@ public abstract class AbstractFlinkTableFactory
isStreamingMode,
context.getObjectIdentifier());
} else {
- Table table = buildPaimonTable(context);
- if (table instanceof FileStoreTable) {
- storeTableLineage(
- ((FileStoreTable)
table).catalogEnvironment().lineageMetaFactory(),
- context,
- (entity, lineageFactory) -> {
- try (LineageMeta lineage =
- lineageFactory.create(() ->
Options.fromMap(table.options()))) {
- lineage.saveSourceTableLineage(entity);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- }
return new DataTableSource(
context.getObjectIdentifier(),
- table,
+ buildPaimonTable(context),
isStreamingMode,
context,
createOptionalLogStoreFactory(context).orElse(null));
@@ -134,46 +113,13 @@ public abstract class AbstractFlinkTableFactory
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
- Table table = buildPaimonTable(context);
- if (table instanceof FileStoreTable) {
- storeTableLineage(
- ((FileStoreTable)
table).catalogEnvironment().lineageMetaFactory(),
- context,
- (entity, lineageFactory) -> {
- try (LineageMeta lineage =
- lineageFactory.create(() ->
Options.fromMap(table.options()))) {
- lineage.saveSinkTableLineage(entity);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- }
return new FlinkTableSink(
context.getObjectIdentifier(),
- table,
+ buildPaimonTable(context),
context,
createOptionalLogStoreFactory(context).orElse(null));
}
- private void storeTableLineage(
- @Nullable LineageMetaFactory lineageMetaFactory,
- Context context,
- BiConsumer<TableLineageEntity, LineageMetaFactory> tableLineage) {
- if (lineageMetaFactory != null) {
- String pipelineName =
context.getConfiguration().get(PipelineOptions.NAME);
- if (pipelineName == null) {
- throw new ValidationException("Cannot get pipeline name for
lineage meta.");
- }
- tableLineage.accept(
- new TableLineageEntityImpl(
- context.getObjectIdentifier().getDatabaseName(),
- context.getObjectIdentifier().getObjectName(),
- pipelineName,
-
Timestamp.fromEpochMillis(System.currentTimeMillis())),
- lineageMetaFactory);
- }
- }
-
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 96334de3f8..10b03b7139 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -21,8 +21,6 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
-import org.apache.paimon.table.system.SinkTableLineageTable;
-import org.apache.paimon.table.system.SourceTableLineageTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.commons.lang3.StringUtils;
@@ -200,9 +198,7 @@ public class CatalogTableITCase extends CatalogITCaseBase {
assertThat(sql("SHOW TABLES"))
.containsExactlyInAnyOrder(
Row.of(AllTableOptionsTable.ALL_TABLE_OPTIONS),
- Row.of(CatalogOptionsTable.CATALOG_OPTIONS),
- Row.of(SourceTableLineageTable.SOURCE_TABLE_LINEAGE),
- Row.of(SinkTableLineageTable.SINK_TABLE_LINEAGE));
+ Row.of(CatalogOptionsTable.CATALOG_OPTIONS));
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
deleted file mode 100644
index 5b61d5272f..0000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.lineage.DataLineageEntity;
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.predicate.Predicate;
-
-import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** ITCase for flink table and data lineage. */
-public class FlinkLineageITCase extends CatalogITCaseBase {
- private static final String THROWING_META = "throwing-meta";
- private static final Map<String, Map<String, TableLineageEntity>>
jobSourceTableLineages =
- new HashMap<>();
- private static final Map<String, Map<String, TableLineageEntity>>
jobSinkTableLineages =
- new HashMap<>();
-
- @Override
- protected List<String> ddl() {
- return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT,
b INT, c INT)");
- }
-
- @Override
- protected Map<String, String> catalogOptions() {
- return Collections.singletonMap(LINEAGE_META.key(), THROWING_META);
- }
-
- @Test
- public void testTableLineage() throws Exception {
- // Validate for source and sink lineage when pipeline name is null
- assertThatThrownBy(
- () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2,
3),(4, 5, 6);").await())
- .hasCauseExactlyInstanceOf(ValidationException.class)
- .hasRootCauseMessage("Cannot get pipeline name for lineage
meta.");
- assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM
T").collect().close())
- .hasCauseExactlyInstanceOf(ValidationException.class)
- .hasRootCauseMessage("Cannot get pipeline name for lineage
meta.");
-
- // Call storeSinkTableLineage and storeSourceTableLineage methods
- tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME,
"insert_t_job");
- tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await();
- assertThat(jobSinkTableLineages).isNotEmpty();
- TableLineageEntity sinkTableLineage =
-
jobSinkTableLineages.get("insert_t_job").get("default.T.insert_t_job");
- assertThat(sinkTableLineage.getTable()).isEqualTo("T");
-
- List<Row> sinkTableRows = new ArrayList<>();
- try (CloseableIterator<Row> iterator =
- tEnv.executeSql("SELECT * FROM
sys.sink_table_lineage").collect()) {
- while (iterator.hasNext()) {
- sinkTableRows.add(iterator.next());
- }
- }
- assertThat(sinkTableRows.size()).isEqualTo(1);
- Row sinkTableRow = sinkTableRows.get(0);
-
assertThat(sinkTableRow.getField("database_name")).isEqualTo("default");
- assertThat(sinkTableRow.getField("table_name")).isEqualTo("T");
-
assertThat(sinkTableRow.getField("job_name")).isEqualTo("insert_t_job");
-
- tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME,
"select_t_job");
- tEnv.executeSql("SELECT * FROM T").collect().close();
- assertThat(jobSourceTableLineages).isNotEmpty();
- TableLineageEntity sourceTableLineage =
-
jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job");
- assertThat(sourceTableLineage.getTable()).isEqualTo("T");
-
- List<Row> sourceTableRows = new ArrayList<>();
- try (CloseableIterator<Row> iterator =
- tEnv.executeSql("SELECT * FROM
sys.source_table_lineage").collect()) {
- while (iterator.hasNext()) {
- sourceTableRows.add(iterator.next());
- }
- }
- assertThat(sourceTableRows.size()).isEqualTo(1);
- Row sourceTableRow = sourceTableRows.get(0);
-
assertThat(sourceTableRow.getField("database_name")).isEqualTo("default");
- assertThat(sourceTableRow.getField("table_name")).isEqualTo("T");
-
assertThat(sourceTableRow.getField("job_name")).isEqualTo("select_t_job");
- }
-
- private static String getTableLineageKey(TableLineageEntity entity) {
- return String.format("%s.%s.%s", entity.getDatabase(),
entity.getTable(), entity.getJob());
- }
-
- /** Factory to create throwing lineage meta. */
- public static class TestingMemoryLineageMetaFactory implements
LineageMetaFactory {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String identifier() {
- return THROWING_META;
- }
-
- @Override
- public LineageMeta create(LineageMetaContext context) {
- return new TestingMemoryLineageMeta();
- }
- }
-
- /** Throwing specific exception in each method. */
- private static class TestingMemoryLineageMeta implements LineageMeta {
-
- @Override
- public void saveSourceTableLineage(TableLineageEntity entity) {
- jobSourceTableLineages
- .computeIfAbsent(entity.getJob(), key -> new HashMap<>())
- .put(getTableLineageKey(entity), entity);
- }
-
- @Override
- public void deleteSourceTableLineage(String job) {
- jobSourceTableLineages.remove(job);
- }
-
- @Override
- public Iterator<TableLineageEntity> sourceTableLineages(@Nullable
Predicate predicate) {
- return jobSourceTableLineages.values().stream()
- .flatMap(v -> v.values().stream())
- .iterator();
- }
-
- @Override
- public void saveSinkTableLineage(TableLineageEntity entity) {
- assertThat(entity.getJob()).isEqualTo("insert_t_job");
- assertThat(entity.getTable()).isEqualTo("T");
- assertThat(entity.getDatabase()).isEqualTo("default");
- jobSinkTableLineages
- .computeIfAbsent(entity.getJob(), key -> new HashMap<>())
- .put(getTableLineageKey(entity), entity);
- }
-
- @Override
- public Iterator<TableLineageEntity> sinkTableLineages(@Nullable
Predicate predicate) {
- return jobSinkTableLineages.values().stream()
- .flatMap(v -> v.values().stream())
- .iterator();
- }
-
- @Override
- public void deleteSinkTableLineage(String job) {
- jobSinkTableLineages.remove(job);
- }
-
- @Override
- public void saveSourceDataLineage(DataLineageEntity entity) {
- assertThat(entity.getJob()).isEqualTo("select_t_job");
- assertThat(entity.getTable()).isEqualTo("T");
- assertThat(entity.getDatabase()).isEqualTo("default");
- throw new UnsupportedOperationException("Method
saveSinkTableLineage is not supported");
- }
-
- @Override
- public Iterator<DataLineageEntity> sourceDataLineages(@Nullable
Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void saveSinkDataLineage(DataLineageEntity entity) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<DataLineageEntity> sinkDataLineages(@Nullable
Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws Exception {}
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index fcb6fe9829..3c05b5fba3 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -15,8 +15,5 @@
org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
-# Lineage meta factory
-org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory
-
# Catalog lock factory
org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
\ No newline at end of file
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 9a90995f28..5157e60600 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -610,8 +610,7 @@ public class HiveCatalog extends AbstractCatalog {
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
- metastoreClientFactory(identifier,
tableMeta.schema()).orElse(null),
- lineageMetaFactory));
+ metastoreClientFactory(identifier,
tableMeta.schema()).orElse(null)));
} catch (TableNotExistException ignore) {
}