This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a7047f0a7 [lineage] Add system sink table lineage meta table (#2243)
a7047f0a7 is described below

commit a7047f0a7a7532ccde549d9e6c85377bfc9a6c54
Author: Fang Yong <[email protected]>
AuthorDate: Mon Nov 6 09:55:51 2023 +0800

    [lineage] Add system sink table lineage meta table (#2243)
---
 .../paimon/table/system/SinkTableLineageTable.java |  64 ++++++++++
 .../table/system/SourceTableLineageTable.java      | 132 +--------------------
 .../paimon/table/system/SystemTableLoader.java     |  10 ++
 ...bleLineageTable.java => TableLineageTable.java} |  58 +++------
 .../apache/paimon/flink/FlinkLineageITCase.java    |  48 +++++---
 5 files changed, 128 insertions(+), 184 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
new file mode 100644
index 000000000..72d6f0fe2
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+
+import java.util.Map;
+
+/**
+ * This is a system table to display all the sink table lineages.
+ *
+ * <pre>
+ *  For example:
+ *     If we select * from sys.sink_table_lineage, we will get
+ *     database_name       table_name       job_name      create_time
+ *        default            test0            job1    2023-10-22 20:35:12
+ *       database1           test1            job1    2023-10-28 21:35:52
+ *          ...               ...             ...             ...
+ *     We can write sql to fetch the information we need.
+ * </pre>
+ */
+public class SinkTableLineageTable extends TableLineageTable {
+
+    public static final String SINK_TABLE_LINEAGE = "sink_table_lineage";
+
+    public SinkTableLineageTable(
+            LineageMetaFactory lineageMetaFactory, Map<String, String> 
options) {
+        super(lineageMetaFactory, options);
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new TableLineageRead(lineageMetaFactory, options, 
LineageMeta::sinkTableLineages);
+    }
+
+    @Override
+    public String name() {
+        return SINK_TABLE_LINEAGE;
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new SinkTableLineageTable(lineageMetaFactory, options);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
index 0cac3852f..bdd6af65b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
@@ -18,44 +18,13 @@
 
 package org.apache.paimon.table.system;
 
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.lineage.LineageMeta;
 import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.ReadOnceTableScan;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.IteratorRecordReader;
-import org.apache.paimon.utils.ProjectedRow;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-
 /**
  * This is a system table to display all the source table lineages.
  *
@@ -69,38 +38,18 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
  *     We can write sql to fetch the information we need.
  * </pre>
  */
-public class SourceTableLineageTable implements ReadonlyTable {
+public class SourceTableLineageTable extends TableLineageTable {
 
     public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";
 
-    private final LineageMetaFactory lineageMetaFactory;
-    private final Map<String, String> options;
-
     public SourceTableLineageTable(
             LineageMetaFactory lineageMetaFactory, Map<String, String> 
options) {
-        this.lineageMetaFactory = lineageMetaFactory;
-        this.options = options;
-    }
-
-    @Override
-    public InnerTableScan newScan() {
-        return new ReadOnceTableScan() {
-            @Override
-            public InnerTableScan withFilter(Predicate predicate) {
-                return this;
-            }
-
-            @Override
-            protected Plan innerPlan() {
-                /// TODO get the real row count for plan.
-                return () -> Collections.singletonList((Split) () -> 1L);
-            }
-        };
+        super(lineageMetaFactory, options);
     }
 
     @Override
     public InnerTableRead newRead() {
-        return new SourceTableLineageRead(lineageMetaFactory, options);
+        return new TableLineageRead(lineageMetaFactory, options, 
LineageMeta::sourceTableLineages);
     }
 
     @Override
@@ -108,83 +57,8 @@ public class SourceTableLineageTable implements 
ReadonlyTable {
         return SOURCE_TABLE_LINEAGE;
     }
 
-    @Override
-    public RowType rowType() {
-        List<DataField> fields = new ArrayList<>();
-        fields.add(new DataField(0, "database_name", new 
VarCharType(VarCharType.MAX_LENGTH)));
-        fields.add(new DataField(1, "table_name", new 
VarCharType(VarCharType.MAX_LENGTH)));
-        fields.add(new DataField(2, "job_name", new 
VarCharType(VarCharType.MAX_LENGTH)));
-        fields.add(new DataField(3, "create_time", new TimestampType()));
-        return new RowType(fields);
-    }
-
-    @Override
-    public List<String> primaryKeys() {
-        return Arrays.asList("database_name", "table_name", "job_name");
-    }
-
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
         return new SourceTableLineageTable(lineageMetaFactory, options);
     }
-
-    /** Source table lineage read. */
-    private static class SourceTableLineageRead implements InnerTableRead {
-        private final LineageMetaFactory lineageMetaFactory;
-        private final Map<String, String> options;
-        @Nullable private Predicate predicate;
-        private int[][] projection;
-
-        private SourceTableLineageRead(
-                LineageMetaFactory lineageMetaFactory, Map<String, String> 
options) {
-            this.lineageMetaFactory = lineageMetaFactory;
-            this.options = options;
-            this.predicate = null;
-        }
-
-        @Override
-        public InnerTableRead withFilter(Predicate predicate) {
-            this.predicate = predicate;
-            return this;
-        }
-
-        @Override
-        public InnerTableRead withProjection(int[][] projection) {
-            this.projection = projection;
-            return this;
-        }
-
-        @Override
-        public TableRead withIOManager(IOManager ioManager) {
-            return this;
-        }
-
-        @Override
-        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-            try (LineageMeta lineageMeta =
-                    lineageMetaFactory.create(() -> Options.fromMap(options))) 
{
-                Iterator<TableLineageEntity> sourceTableLineages =
-                        lineageMeta.sourceTableLineages(predicate);
-                return new IteratorRecordReader<>(
-                        Iterators.transform(
-                                sourceTableLineages,
-                                entity -> {
-                                    checkNotNull(entity);
-                                    GenericRow row =
-                                            GenericRow.of(
-                                                    
BinaryString.fromString(entity.getDatabase()),
-                                                    
BinaryString.fromString(entity.getTable()),
-                                                    
BinaryString.fromString(entity.getJob()),
-                                                    entity.getCreateTime());
-                                    if (projection != null) {
-                                        return 
ProjectedRow.from(projection).replaceRow(row);
-                                    } else {
-                                        return row;
-                                    }
-                                }));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 91c7b92f7..3079c7284 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -39,6 +39,7 @@ import static 
org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
 import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
 import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
+import static 
org.apache.paimon.table.system.SinkTableLineageTable.SINK_TABLE_LINEAGE;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
 import static 
org.apache.paimon.table.system.SourceTableLineageTable.SOURCE_TABLE_LINEAGE;
 import static org.apache.paimon.table.system.TagsTable.TAGS;
@@ -95,6 +96,15 @@ public class SystemTableLoader {
                                     LINEAGE_META.key()));
                     return new SourceTableLineageTable(lineageMetaFactory, 
catalogOptions);
                 }
+            case SINK_TABLE_LINEAGE:
+                {
+                    checkNotNull(
+                            lineageMetaFactory,
+                            String.format(
+                                    "Lineage meta should be configured for 
catalog with %s",
+                                    LINEAGE_META.key()));
+                    return new SinkTableLineageTable(lineageMetaFactory, 
catalogOptions);
+                }
             default:
                 return null;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
similarity index 79%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
index 0cac3852f..3f43a764b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
@@ -29,7 +29,6 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.ReadonlyTable;
-import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
@@ -53,30 +52,16 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiFunction;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/**
- * This is a system table to display all the source table lineages.
- *
- * <pre>
- *  For example:
- *     If we select * from sys.source_table_lineage, we will get
- *     database_name       table_name       job_name      create_time
- *        default            test0            job1    2023-10-22 20:35:12
- *       database1           test1            job1    2023-10-28 21:35:52
- *          ...               ...             ...             ...
- *     We can write sql to fetch the information we need.
- * </pre>
- */
-public class SourceTableLineageTable implements ReadonlyTable {
-
-    public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";
-
-    private final LineageMetaFactory lineageMetaFactory;
-    private final Map<String, String> options;
+/** Base lineage table for source and sink table lineage. */
+public abstract class TableLineageTable implements ReadonlyTable {
+    protected final LineageMetaFactory lineageMetaFactory;
+    protected final Map<String, String> options;
 
-    public SourceTableLineageTable(
+    protected TableLineageTable(
             LineageMetaFactory lineageMetaFactory, Map<String, String> 
options) {
         this.lineageMetaFactory = lineageMetaFactory;
         this.options = options;
@@ -98,16 +83,6 @@ public class SourceTableLineageTable implements 
ReadonlyTable {
         };
     }
 
-    @Override
-    public InnerTableRead newRead() {
-        return new SourceTableLineageRead(lineageMetaFactory, options);
-    }
-
-    @Override
-    public String name() {
-        return SOURCE_TABLE_LINEAGE;
-    }
-
     @Override
     public RowType rowType() {
         List<DataField> fields = new ArrayList<>();
@@ -123,22 +98,23 @@ public class SourceTableLineageTable implements 
ReadonlyTable {
         return Arrays.asList("database_name", "table_name", "job_name");
     }
 
-    @Override
-    public Table copy(Map<String, String> dynamicOptions) {
-        return new SourceTableLineageTable(lineageMetaFactory, options);
-    }
-
-    /** Source table lineage read. */
-    private static class SourceTableLineageRead implements InnerTableRead {
+    /** Table lineage read with lineage meta query. */
+    protected static class TableLineageRead implements InnerTableRead {
         private final LineageMetaFactory lineageMetaFactory;
         private final Map<String, String> options;
+        private final BiFunction<LineageMeta, Predicate, 
Iterator<TableLineageEntity>>
+                tableLineageQuery;
         @Nullable private Predicate predicate;
         private int[][] projection;
 
-        private SourceTableLineageRead(
-                LineageMetaFactory lineageMetaFactory, Map<String, String> 
options) {
+        protected TableLineageRead(
+                LineageMetaFactory lineageMetaFactory,
+                Map<String, String> options,
+                BiFunction<LineageMeta, Predicate, 
Iterator<TableLineageEntity>>
+                        tableLineageQuery) {
             this.lineageMetaFactory = lineageMetaFactory;
             this.options = options;
+            this.tableLineageQuery = tableLineageQuery;
             this.predicate = null;
         }
 
@@ -164,7 +140,7 @@ public class SourceTableLineageTable implements 
ReadonlyTable {
             try (LineageMeta lineageMeta =
                     lineageMetaFactory.create(() -> Options.fromMap(options))) 
{
                 Iterator<TableLineageEntity> sourceTableLineages =
-                        lineageMeta.sourceTableLineages(predicate);
+                        tableLineageQuery.apply(lineageMeta, predicate);
                 return new IteratorRecordReader<>(
                         Iterators.transform(
                                 sourceTableLineages,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
index 0625facbf..5b61d5272 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
@@ -48,6 +48,8 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
     private static final String THROWING_META = "throwing-meta";
     private static final Map<String, Map<String, TableLineageEntity>> 
jobSourceTableLineages =
             new HashMap<>();
+    private static final Map<String, Map<String, TableLineageEntity>> 
jobSinkTableLineages =
+            new HashMap<>();
 
     @Override
     protected List<String> ddl() {
@@ -72,10 +74,24 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
 
         // Call storeSinkTableLineage and storeSourceTableLineage methods
         tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, 
"insert_t_job");
-        assertThatThrownBy(
-                        () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2, 
3),(4, 5, 6);").await())
-                .hasRootCauseInstanceOf(UnsupportedOperationException.class)
-                .hasRootCauseMessage("Method saveSinkTableLineage is not 
supported");
+        tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await();
+        assertThat(jobSinkTableLineages).isNotEmpty();
+        TableLineageEntity sinkTableLineage =
+                
jobSinkTableLineages.get("insert_t_job").get("default.T.insert_t_job");
+        assertThat(sinkTableLineage.getTable()).isEqualTo("T");
+
+        List<Row> sinkTableRows = new ArrayList<>();
+        try (CloseableIterator<Row> iterator =
+                tEnv.executeSql("SELECT * FROM 
sys.sink_table_lineage").collect()) {
+            while (iterator.hasNext()) {
+                sinkTableRows.add(iterator.next());
+            }
+        }
+        assertThat(sinkTableRows.size()).isEqualTo(1);
+        Row sinkTableRow = sinkTableRows.get(0);
+        
assertThat(sinkTableRow.getField("database_name")).isEqualTo("default");
+        assertThat(sinkTableRow.getField("table_name")).isEqualTo("T");
+        
assertThat(sinkTableRow.getField("job_name")).isEqualTo("insert_t_job");
 
         tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, 
"select_t_job");
         tEnv.executeSql("SELECT * FROM T").collect().close();
@@ -84,18 +100,18 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
                 
jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job");
         assertThat(sourceTableLineage.getTable()).isEqualTo("T");
 
-        List<Row> lineageRows = new ArrayList<>();
+        List<Row> sourceTableRows = new ArrayList<>();
         try (CloseableIterator<Row> iterator =
                 tEnv.executeSql("SELECT * FROM 
sys.source_table_lineage").collect()) {
             while (iterator.hasNext()) {
-                lineageRows.add(iterator.next());
+                sourceTableRows.add(iterator.next());
             }
         }
-        assertThat(lineageRows.size()).isEqualTo(1);
-        Row lineageRow = lineageRows.get(0);
-        assertThat(lineageRow.getField("database_name")).isEqualTo("default");
-        assertThat(lineageRow.getField("table_name")).isEqualTo("T");
-        assertThat(lineageRow.getField("job_name")).isEqualTo("select_t_job");
+        assertThat(sourceTableRows.size()).isEqualTo(1);
+        Row sourceTableRow = sourceTableRows.get(0);
+        
assertThat(sourceTableRow.getField("database_name")).isEqualTo("default");
+        assertThat(sourceTableRow.getField("table_name")).isEqualTo("T");
+        
assertThat(sourceTableRow.getField("job_name")).isEqualTo("select_t_job");
     }
 
     private static String getTableLineageKey(TableLineageEntity entity) {
@@ -144,17 +160,21 @@ public class FlinkLineageITCase extends CatalogITCaseBase 
{
             assertThat(entity.getJob()).isEqualTo("insert_t_job");
             assertThat(entity.getTable()).isEqualTo("T");
             assertThat(entity.getDatabase()).isEqualTo("default");
-            throw new UnsupportedOperationException("Method 
saveSinkTableLineage is not supported");
+            jobSinkTableLineages
+                    .computeIfAbsent(entity.getJob(), key -> new HashMap<>())
+                    .put(getTableLineageKey(entity), entity);
         }
 
         @Override
         public Iterator<TableLineageEntity> sinkTableLineages(@Nullable 
Predicate predicate) {
-            throw new UnsupportedOperationException();
+            return jobSinkTableLineages.values().stream()
+                    .flatMap(v -> v.values().stream())
+                    .iterator();
         }
 
         @Override
         public void deleteSinkTableLineage(String job) {
-            throw new UnsupportedOperationException();
+            jobSinkTableLineages.remove(job);
         }
 
         @Override

Reply via email to