This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a7047f0a7 [lineage] Add system sink table lineage meta table (#2243)
a7047f0a7 is described below
commit a7047f0a7a7532ccde549d9e6c85377bfc9a6c54
Author: Fang Yong <[email protected]>
AuthorDate: Mon Nov 6 09:55:51 2023 +0800
[lineage] Add system sink table lineage meta table (#2243)
---
.../paimon/table/system/SinkTableLineageTable.java | 64 ++++++++++
.../table/system/SourceTableLineageTable.java | 132 +--------------------
.../paimon/table/system/SystemTableLoader.java | 10 ++
...bleLineageTable.java => TableLineageTable.java} | 58 +++------
.../apache/paimon/flink/FlinkLineageITCase.java | 48 +++++---
5 files changed, 128 insertions(+), 184 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
new file mode 100644
index 000000000..72d6f0fe2
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+
+import java.util.Map;
+
+/**
+ * This is a system table to display all the sink table lineages.
+ *
+ * <pre>
+ * For example:
+ * If we select * from sys.sink_table_lineage, we will get
+ * database_name table_name job_name create_time
+ * default test0 job1 2023-10-22 20:35:12
+ * database1 test1 job1 2023-10-28 21:35:52
+ * ... ... ... ...
+ * We can write sql to fetch the information we need.
+ * </pre>
+ */
+public class SinkTableLineageTable extends TableLineageTable {
+
+ public static final String SINK_TABLE_LINEAGE = "sink_table_lineage";
+
+ public SinkTableLineageTable(
+ LineageMetaFactory lineageMetaFactory, Map<String, String>
options) {
+ super(lineageMetaFactory, options);
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new TableLineageRead(lineageMetaFactory, options,
LineageMeta::sinkTableLineages);
+ }
+
+ @Override
+ public String name() {
+ return SINK_TABLE_LINEAGE;
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new SinkTableLineageTable(lineageMetaFactory, options);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
index 0cac3852f..bdd6af65b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
@@ -18,44 +18,13 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.ReadOnceTableScan;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.IteratorRecordReader;
-import org.apache.paimon.utils.ProjectedRow;
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-
/**
* This is a system table to display all the source table lineages.
*
@@ -69,38 +38,18 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
* We can write sql to fetch the information we need.
* </pre>
*/
-public class SourceTableLineageTable implements ReadonlyTable {
+public class SourceTableLineageTable extends TableLineageTable {
public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";
- private final LineageMetaFactory lineageMetaFactory;
- private final Map<String, String> options;
-
public SourceTableLineageTable(
LineageMetaFactory lineageMetaFactory, Map<String, String>
options) {
- this.lineageMetaFactory = lineageMetaFactory;
- this.options = options;
- }
-
- @Override
- public InnerTableScan newScan() {
- return new ReadOnceTableScan() {
- @Override
- public InnerTableScan withFilter(Predicate predicate) {
- return this;
- }
-
- @Override
- protected Plan innerPlan() {
- /// TODO get the real row count for plan.
- return () -> Collections.singletonList((Split) () -> 1L);
- }
- };
+ super(lineageMetaFactory, options);
}
@Override
public InnerTableRead newRead() {
- return new SourceTableLineageRead(lineageMetaFactory, options);
+ return new TableLineageRead(lineageMetaFactory, options,
LineageMeta::sourceTableLineages);
}
@Override
@@ -108,83 +57,8 @@ public class SourceTableLineageTable implements
ReadonlyTable {
return SOURCE_TABLE_LINEAGE;
}
- @Override
- public RowType rowType() {
- List<DataField> fields = new ArrayList<>();
- fields.add(new DataField(0, "database_name", new
VarCharType(VarCharType.MAX_LENGTH)));
- fields.add(new DataField(1, "table_name", new
VarCharType(VarCharType.MAX_LENGTH)));
- fields.add(new DataField(2, "job_name", new
VarCharType(VarCharType.MAX_LENGTH)));
- fields.add(new DataField(3, "create_time", new TimestampType()));
- return new RowType(fields);
- }
-
- @Override
- public List<String> primaryKeys() {
- return Arrays.asList("database_name", "table_name", "job_name");
- }
-
@Override
public Table copy(Map<String, String> dynamicOptions) {
return new SourceTableLineageTable(lineageMetaFactory, options);
}
-
- /** Source table lineage read. */
- private static class SourceTableLineageRead implements InnerTableRead {
- private final LineageMetaFactory lineageMetaFactory;
- private final Map<String, String> options;
- @Nullable private Predicate predicate;
- private int[][] projection;
-
- private SourceTableLineageRead(
- LineageMetaFactory lineageMetaFactory, Map<String, String>
options) {
- this.lineageMetaFactory = lineageMetaFactory;
- this.options = options;
- this.predicate = null;
- }
-
- @Override
- public InnerTableRead withFilter(Predicate predicate) {
- this.predicate = predicate;
- return this;
- }
-
- @Override
- public InnerTableRead withProjection(int[][] projection) {
- this.projection = projection;
- return this;
- }
-
- @Override
- public TableRead withIOManager(IOManager ioManager) {
- return this;
- }
-
- @Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- try (LineageMeta lineageMeta =
- lineageMetaFactory.create(() -> Options.fromMap(options)))
{
- Iterator<TableLineageEntity> sourceTableLineages =
- lineageMeta.sourceTableLineages(predicate);
- return new IteratorRecordReader<>(
- Iterators.transform(
- sourceTableLineages,
- entity -> {
- checkNotNull(entity);
- GenericRow row =
- GenericRow.of(
-
BinaryString.fromString(entity.getDatabase()),
-
BinaryString.fromString(entity.getTable()),
-
BinaryString.fromString(entity.getJob()),
- entity.getCreateTime());
- if (projection != null) {
- return
ProjectedRow.from(projection).replaceRow(row);
- } else {
- return row;
- }
- }));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 91c7b92f7..3079c7284 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -39,6 +39,7 @@ import static
org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
+import static
org.apache.paimon.table.system.SinkTableLineageTable.SINK_TABLE_LINEAGE;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
import static
org.apache.paimon.table.system.SourceTableLineageTable.SOURCE_TABLE_LINEAGE;
import static org.apache.paimon.table.system.TagsTable.TAGS;
@@ -95,6 +96,15 @@ public class SystemTableLoader {
LINEAGE_META.key()));
return new SourceTableLineageTable(lineageMetaFactory,
catalogOptions);
}
+ case SINK_TABLE_LINEAGE:
+ {
+ checkNotNull(
+ lineageMetaFactory,
+ String.format(
+ "Lineage meta should be configured for
catalog with %s",
+ LINEAGE_META.key()));
+ return new SinkTableLineageTable(lineageMetaFactory,
catalogOptions);
+ }
default:
return null;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
similarity index 79%
copy from
paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
index 0cac3852f..3f43a764b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
@@ -29,7 +29,6 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.ReadonlyTable;
-import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
@@ -53,30 +52,16 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.BiFunction;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
-/**
- * This is a system table to display all the source table lineages.
- *
- * <pre>
- * For example:
- * If we select * from sys.source_table_lineage, we will get
- * database_name table_name job_name create_time
- * default test0 job1 2023-10-22 20:35:12
- * database1 test1 job1 2023-10-28 21:35:52
- * ... ... ... ...
- * We can write sql to fetch the information we need.
- * </pre>
- */
-public class SourceTableLineageTable implements ReadonlyTable {
-
- public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";
-
- private final LineageMetaFactory lineageMetaFactory;
- private final Map<String, String> options;
+/** Base lineage table for source and sink table lineage. */
+public abstract class TableLineageTable implements ReadonlyTable {
+ protected final LineageMetaFactory lineageMetaFactory;
+ protected final Map<String, String> options;
- public SourceTableLineageTable(
+ protected TableLineageTable(
LineageMetaFactory lineageMetaFactory, Map<String, String>
options) {
this.lineageMetaFactory = lineageMetaFactory;
this.options = options;
@@ -98,16 +83,6 @@ public class SourceTableLineageTable implements
ReadonlyTable {
};
}
- @Override
- public InnerTableRead newRead() {
- return new SourceTableLineageRead(lineageMetaFactory, options);
- }
-
- @Override
- public String name() {
- return SOURCE_TABLE_LINEAGE;
- }
-
@Override
public RowType rowType() {
List<DataField> fields = new ArrayList<>();
@@ -123,22 +98,23 @@ public class SourceTableLineageTable implements
ReadonlyTable {
return Arrays.asList("database_name", "table_name", "job_name");
}
- @Override
- public Table copy(Map<String, String> dynamicOptions) {
- return new SourceTableLineageTable(lineageMetaFactory, options);
- }
-
- /** Source table lineage read. */
- private static class SourceTableLineageRead implements InnerTableRead {
+ /** Table lineage read with lineage meta query. */
+ protected static class TableLineageRead implements InnerTableRead {
private final LineageMetaFactory lineageMetaFactory;
private final Map<String, String> options;
+ private final BiFunction<LineageMeta, Predicate,
Iterator<TableLineageEntity>>
+ tableLineageQuery;
@Nullable private Predicate predicate;
private int[][] projection;
- private SourceTableLineageRead(
- LineageMetaFactory lineageMetaFactory, Map<String, String>
options) {
+ protected TableLineageRead(
+ LineageMetaFactory lineageMetaFactory,
+ Map<String, String> options,
+ BiFunction<LineageMeta, Predicate,
Iterator<TableLineageEntity>>
+ tableLineageQuery) {
this.lineageMetaFactory = lineageMetaFactory;
this.options = options;
+ this.tableLineageQuery = tableLineageQuery;
this.predicate = null;
}
@@ -164,7 +140,7 @@ public class SourceTableLineageTable implements
ReadonlyTable {
try (LineageMeta lineageMeta =
lineageMetaFactory.create(() -> Options.fromMap(options)))
{
Iterator<TableLineageEntity> sourceTableLineages =
- lineageMeta.sourceTableLineages(predicate);
+ tableLineageQuery.apply(lineageMeta, predicate);
return new IteratorRecordReader<>(
Iterators.transform(
sourceTableLineages,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
index 0625facbf..5b61d5272 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
@@ -48,6 +48,8 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
private static final String THROWING_META = "throwing-meta";
private static final Map<String, Map<String, TableLineageEntity>>
jobSourceTableLineages =
new HashMap<>();
+ private static final Map<String, Map<String, TableLineageEntity>>
jobSinkTableLineages =
+ new HashMap<>();
@Override
protected List<String> ddl() {
@@ -72,10 +74,24 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
// Call storeSinkTableLineage and storeSourceTableLineage methods
tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME,
"insert_t_job");
- assertThatThrownBy(
- () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2,
3),(4, 5, 6);").await())
- .hasRootCauseInstanceOf(UnsupportedOperationException.class)
- .hasRootCauseMessage("Method saveSinkTableLineage is not
supported");
+ tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await();
+ assertThat(jobSinkTableLineages).isNotEmpty();
+ TableLineageEntity sinkTableLineage =
+
jobSinkTableLineages.get("insert_t_job").get("default.T.insert_t_job");
+ assertThat(sinkTableLineage.getTable()).isEqualTo("T");
+
+ List<Row> sinkTableRows = new ArrayList<>();
+ try (CloseableIterator<Row> iterator =
+ tEnv.executeSql("SELECT * FROM
sys.sink_table_lineage").collect()) {
+ while (iterator.hasNext()) {
+ sinkTableRows.add(iterator.next());
+ }
+ }
+ assertThat(sinkTableRows.size()).isEqualTo(1);
+ Row sinkTableRow = sinkTableRows.get(0);
+
assertThat(sinkTableRow.getField("database_name")).isEqualTo("default");
+ assertThat(sinkTableRow.getField("table_name")).isEqualTo("T");
+
assertThat(sinkTableRow.getField("job_name")).isEqualTo("insert_t_job");
tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME,
"select_t_job");
tEnv.executeSql("SELECT * FROM T").collect().close();
@@ -84,18 +100,18 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job");
assertThat(sourceTableLineage.getTable()).isEqualTo("T");
- List<Row> lineageRows = new ArrayList<>();
+ List<Row> sourceTableRows = new ArrayList<>();
try (CloseableIterator<Row> iterator =
tEnv.executeSql("SELECT * FROM
sys.source_table_lineage").collect()) {
while (iterator.hasNext()) {
- lineageRows.add(iterator.next());
+ sourceTableRows.add(iterator.next());
}
}
- assertThat(lineageRows.size()).isEqualTo(1);
- Row lineageRow = lineageRows.get(0);
- assertThat(lineageRow.getField("database_name")).isEqualTo("default");
- assertThat(lineageRow.getField("table_name")).isEqualTo("T");
- assertThat(lineageRow.getField("job_name")).isEqualTo("select_t_job");
+ assertThat(sourceTableRows.size()).isEqualTo(1);
+ Row sourceTableRow = sourceTableRows.get(0);
+
assertThat(sourceTableRow.getField("database_name")).isEqualTo("default");
+ assertThat(sourceTableRow.getField("table_name")).isEqualTo("T");
+
assertThat(sourceTableRow.getField("job_name")).isEqualTo("select_t_job");
}
private static String getTableLineageKey(TableLineageEntity entity) {
@@ -144,17 +160,21 @@ public class FlinkLineageITCase extends CatalogITCaseBase
{
assertThat(entity.getJob()).isEqualTo("insert_t_job");
assertThat(entity.getTable()).isEqualTo("T");
assertThat(entity.getDatabase()).isEqualTo("default");
- throw new UnsupportedOperationException("Method
saveSinkTableLineage is not supported");
+ jobSinkTableLineages
+ .computeIfAbsent(entity.getJob(), key -> new HashMap<>())
+ .put(getTableLineageKey(entity), entity);
}
@Override
public Iterator<TableLineageEntity> sinkTableLineages(@Nullable
Predicate predicate) {
- throw new UnsupportedOperationException();
+ return jobSinkTableLineages.values().stream()
+ .flatMap(v -> v.values().stream())
+ .iterator();
}
@Override
public void deleteSinkTableLineage(String job) {
- throw new UnsupportedOperationException();
+ jobSinkTableLineages.remove(job);
}
@Override