This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e93b425a [lineage] Add system source table lineage meta table (#2210)
9e93b425a is described below

commit 9e93b425ad80155a441955b95affb4c9fed95e14
Author: Fang Yong <[email protected]>
AuthorDate: Thu Nov 2 08:36:57 2023 +0800

    [lineage] Add system source table lineage meta table (#2210)
    
    * [lineage] Add system source table lineage meta table
---
 .../apache/paimon/lineage/LineageMetaFactory.java  |   4 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java |   6 +-
 .../table/system/SourceTableLineageTable.java      | 190 +++++++++++++++++++++
 .../paimon/table/system/SystemTableLoader.java     |  16 +-
 .../apache/paimon/flink/FlinkLineageITCase.java    |  54 ++++--
 .../services/org.apache.paimon.factories.Factory   |   2 +-
 6 files changed, 255 insertions(+), 17 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
index 5cb7f8bae..11c6d3a11 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
@@ -21,8 +21,10 @@ package org.apache.paimon.lineage;
 import org.apache.paimon.factories.Factory;
 import org.apache.paimon.options.Options;
 
+import java.io.Serializable;
+
 /** Factory to create {@link LineageMeta}. Each factory should have a unique 
identifier. */
-public interface LineageMetaFactory extends Factory {
+public interface LineageMetaFactory extends Factory, Serializable {
 
     LineageMeta create(LineageMetaContext context);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 75c476cc8..0ed1d842c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -244,7 +244,11 @@ public abstract class AbstractCatalog implements Catalog {
             String tableName = identifier.getObjectName();
             Table table =
                     SystemTableLoader.loadGlobal(
-                            tableName, fileIO, this::allTablePaths, 
catalogOptions);
+                            tableName,
+                            fileIO,
+                            this::allTablePaths,
+                            catalogOptions,
+                            lineageMetaFactory);
             if (table == null) {
                 throw new TableNotExistException(identifier);
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
new file mode 100644
index 000000000..0cac3852f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.lineage.TableLineageEntity;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * This is a system table to display all the source table lineages.
+ *
+ * <pre>
+ *  For example:
+ *     If we select * from sys.source_table_lineage, we will get
+ *     database_name       table_name       job_name      create_time
+ *        default            test0            job1    2023-10-22 20:35:12
+ *       database1           test1            job1    2023-10-28 21:35:52
+ *          ...               ...             ...             ...
+ *     We can write sql to fetch the information we need.
+ * </pre>
+ */
+public class SourceTableLineageTable implements ReadonlyTable {
+
+    public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";
+
+    private final LineageMetaFactory lineageMetaFactory;
+    private final Map<String, String> options;
+
+    public SourceTableLineageTable(
+            LineageMetaFactory lineageMetaFactory, Map<String, String> 
options) {
+        this.lineageMetaFactory = lineageMetaFactory;
+        this.options = options;
+    }
+
+    @Override
+    public InnerTableScan newScan() {
+        return new ReadOnceTableScan() {
+            @Override
+            public InnerTableScan withFilter(Predicate predicate) {
+                return this;
+            }
+
+            @Override
+            protected Plan innerPlan() {
+                /// TODO get the real row count for plan.
+                return () -> Collections.singletonList((Split) () -> 1L);
+            }
+        };
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new SourceTableLineageRead(lineageMetaFactory, options);
+    }
+
+    @Override
+    public String name() {
+        return SOURCE_TABLE_LINEAGE;
+    }
+
+    @Override
+    public RowType rowType() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "database_name", new 
VarCharType(VarCharType.MAX_LENGTH)));
+        fields.add(new DataField(1, "table_name", new 
VarCharType(VarCharType.MAX_LENGTH)));
+        fields.add(new DataField(2, "job_name", new 
VarCharType(VarCharType.MAX_LENGTH)));
+        fields.add(new DataField(3, "create_time", new TimestampType()));
+        return new RowType(fields);
+    }
+
+    @Override
+    public List<String> primaryKeys() {
+        return Arrays.asList("database_name", "table_name", "job_name");
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new SourceTableLineageTable(lineageMetaFactory, options);
+    }
+
+    /** Source table lineage read. */
+    private static class SourceTableLineageRead implements InnerTableRead {
+        private final LineageMetaFactory lineageMetaFactory;
+        private final Map<String, String> options;
+        @Nullable private Predicate predicate;
+        private int[][] projection;
+
+        private SourceTableLineageRead(
+                LineageMetaFactory lineageMetaFactory, Map<String, String> 
options) {
+            this.lineageMetaFactory = lineageMetaFactory;
+            this.options = options;
+            this.predicate = null;
+        }
+
+        @Override
+        public InnerTableRead withFilter(Predicate predicate) {
+            this.predicate = predicate;
+            return this;
+        }
+
+        @Override
+        public InnerTableRead withProjection(int[][] projection) {
+            this.projection = projection;
+            return this;
+        }
+
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            try (LineageMeta lineageMeta =
+                    lineageMetaFactory.create(() -> Options.fromMap(options))) 
{
+                Iterator<TableLineageEntity> sourceTableLineages =
+                        lineageMeta.sourceTableLineages(predicate);
+                return new IteratorRecordReader<>(
+                        Iterators.transform(
+                                sourceTableLineages,
+                                entity -> {
+                                    checkNotNull(entity);
+                                    GenericRow row =
+                                            GenericRow.of(
+                                                    
BinaryString.fromString(entity.getDatabase()),
+                                                    
BinaryString.fromString(entity.getTable()),
+                                                    
BinaryString.fromString(entity.getJob()),
+                                                    entity.getCreateTime());
+                                    if (projection != null) {
+                                        return 
ProjectedRow.from(projection).replaceRow(row);
+                                    } else {
+                                        return row;
+                                    }
+                                }));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index a241f51e3..91c7b92f7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.system;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.lineage.LineageMetaFactory;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
@@ -28,6 +29,7 @@ import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.function.Supplier;
 
+import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
 import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
 import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
 import static 
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
@@ -38,7 +40,9 @@ import static 
org.apache.paimon.table.system.OptionsTable.OPTIONS;
 import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
+import static 
org.apache.paimon.table.system.SourceTableLineageTable.SOURCE_TABLE_LINEAGE;
 import static org.apache.paimon.table.system.TagsTable.TAGS;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Loader to load system {@link Table}s. */
 public class SystemTableLoader {
@@ -75,12 +79,22 @@ public class SystemTableLoader {
             String tableName,
             FileIO fileIO,
             Supplier<Map<String, Map<String, Path>>> allTablePaths,
-            Map<String, String> catalogOptions) {
+            Map<String, String> catalogOptions,
+            @Nullable LineageMetaFactory lineageMetaFactory) {
         switch (tableName.toLowerCase()) {
             case ALL_TABLE_OPTIONS:
                 return new AllTableOptionsTable(fileIO, allTablePaths.get());
             case CATALOG_OPTIONS:
                 return new CatalogOptionsTable(catalogOptions);
+            case SOURCE_TABLE_LINEAGE:
+                {
+                    checkNotNull(
+                            lineageMetaFactory,
+                            String.format(
+                                    "Lineage meta should be configured for 
catalog with %s",
+                                    LINEAGE_META.key()));
+                    return new SourceTableLineageTable(lineageMetaFactory, 
catalogOptions);
+                }
             default:
                 return null;
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
index ba1724706..0625facbf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
@@ -26,11 +26,15 @@ import org.apache.paimon.predicate.Predicate;
 
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +46,8 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** ITCase for flink table and data lineage. */
 public class FlinkLineageITCase extends CatalogITCaseBase {
     private static final String THROWING_META = "throwing-meta";
+    private static final Map<String, Map<String, TableLineageEntity>> 
jobSourceTableLineages =
+            new HashMap<>();
 
     @Override
     protected List<String> ddl() {
@@ -54,7 +60,7 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testTableLineage() {
+    public void testTableLineage() throws Exception {
         // Validate for source and sink lineage when pipeline name is null
         assertThatThrownBy(
                         () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2, 
3),(4, 5, 6);").await())
@@ -72,13 +78,34 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
                 .hasRootCauseMessage("Method saveSinkTableLineage is not 
supported");
 
         tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, 
"select_t_job");
-        assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM 
T").collect().close())
-                .hasRootCauseInstanceOf(UnsupportedOperationException.class)
-                .hasRootCauseMessage("Method saveSourceTableLineage is not 
supported");
+        tEnv.executeSql("SELECT * FROM T").collect().close();
+        assertThat(jobSourceTableLineages).isNotEmpty();
+        TableLineageEntity sourceTableLineage =
+                
jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job");
+        assertThat(sourceTableLineage.getTable()).isEqualTo("T");
+
+        List<Row> lineageRows = new ArrayList<>();
+        try (CloseableIterator<Row> iterator =
+                tEnv.executeSql("SELECT * FROM 
sys.source_table_lineage").collect()) {
+            while (iterator.hasNext()) {
+                lineageRows.add(iterator.next());
+            }
+        }
+        assertThat(lineageRows.size()).isEqualTo(1);
+        Row lineageRow = lineageRows.get(0);
+        assertThat(lineageRow.getField("database_name")).isEqualTo("default");
+        assertThat(lineageRow.getField("table_name")).isEqualTo("T");
+        assertThat(lineageRow.getField("job_name")).isEqualTo("select_t_job");
+    }
+
+    private static String getTableLineageKey(TableLineageEntity entity) {
+        return String.format("%s.%s.%s", entity.getDatabase(), 
entity.getTable(), entity.getJob());
     }
 
     /** Factory to create throwing lineage meta. */
-    public static class ThrowingLineageMetaFactory implements 
LineageMetaFactory {
+    public static class TestingMemoryLineageMetaFactory implements 
LineageMetaFactory {
+        private static final long serialVersionUID = 1L;
+
         @Override
         public String identifier() {
             return THROWING_META;
@@ -86,29 +113,30 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
 
         @Override
         public LineageMeta create(LineageMetaContext context) {
-            return new ThrowingLineageMeta();
+            return new TestingMemoryLineageMeta();
         }
     }
 
     /** Throwing specific exception in each method. */
-    private static class ThrowingLineageMeta implements LineageMeta {
-
-        private static final long serialVersionUID = 1L;
+    private static class TestingMemoryLineageMeta implements LineageMeta {
 
         @Override
         public void saveSourceTableLineage(TableLineageEntity entity) {
-            throw new UnsupportedOperationException(
-                    "Method saveSourceTableLineage is not supported");
+            jobSourceTableLineages
+                    .computeIfAbsent(entity.getJob(), key -> new HashMap<>())
+                    .put(getTableLineageKey(entity), entity);
         }
 
         @Override
         public void deleteSourceTableLineage(String job) {
-            throw new UnsupportedOperationException();
+            jobSourceTableLineages.remove(job);
         }
 
         @Override
         public Iterator<TableLineageEntity> sourceTableLineages(@Nullable 
Predicate predicate) {
-            throw new UnsupportedOperationException();
+            return jobSourceTableLineages.values().stream()
+                    .flatMap(v -> v.values().stream())
+                    .iterator();
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 5058dabb6..22e88ba48 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,4 +16,4 @@
 org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
 
 # Lineage meta factory
-org.apache.paimon.flink.FlinkLineageITCase$ThrowingLineageMetaFactory
\ No newline at end of file
+org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory
\ No newline at end of file

Reply via email to