This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 67df9b2a4 [core] Support for query manifests (#1399)
67df9b2a4 is described below

commit 67df9b2a48993f0703ffa9df7bac18956a75f42e
Author: xwmr-max <[email protected]>
AuthorDate: Tue Jul 4 10:14:36 2023 +0800

    [core] Support for query manifests (#1399)
---
 .../apache/paimon/table/system/ManifestsTable.java | 252 +++++++++++++++++++++
 .../paimon/table/system/SystemTableLoader.java     |   3 +
 .../apache/paimon/flink/CatalogTableITCase.java    |  26 +++
 .../org/apache/paimon/spark/SparkReadITCase.java   |  24 ++
 4 files changed, 305 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
new file mode 100644
index 000000000..f1cc8bce9
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.SerializationUtils;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for showing committing snapshots of table. */
+public class ManifestsTable implements ReadonlyTable {
+    private static final long serialVersionUID = 1L;
+
+    public static final String MANIFESTS = "manifests";
+
+    public static final RowType TABLE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new DataField(0, "file_name", 
SerializationUtils.newStringType(false)),
+                            new DataField(1, "file_size", new 
BigIntType(false)),
+                            new DataField(2, "num_added_files", new 
BigIntType(false)),
+                            new DataField(3, "num_deleted_files", new 
BigIntType(false)),
+                            new DataField(4, "schema_id", new 
BigIntType(false))));
+
+    private final FileIO fileIO;
+    private final Path location;
+    private final Table dataTable;
+
+    public ManifestsTable(FileIO fileIO, Path location, Table dataTable) {
+        this.fileIO = fileIO;
+        this.location = location;
+        this.dataTable = dataTable;
+    }
+
+    @Override
+    public InnerTableScan newScan() {
+        return new ManifestsScan();
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new ManifestsRead(fileIO, dataTable);
+    }
+
+    @Override
+    public String name() {
+        return location.getName() + SYSTEM_TABLE_SPLITTER + MANIFESTS;
+    }
+
+    @Override
+    public RowType rowType() {
+        return TABLE_TYPE;
+    }
+
+    @Override
+    public List<String> primaryKeys() {
+        return Collections.singletonList("file_name");
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new ManifestsTable(fileIO, location, dataTable);
+    }
+
+    private class ManifestsScan extends ReadOnceTableScan {
+
+        @Override
+        public InnerTableScan withFilter(Predicate predicate) {
+            // TODO
+            return this;
+        }
+
+        @Override
+        protected Plan innerPlan() {
+            return () -> Collections.singletonList(new ManifestsSplit(fileIO, 
location, dataTable));
+        }
+    }
+
+    private static class ManifestsSplit implements Split {
+
+        private static final long serialVersionUID = 1L;
+
+        private final FileIO fileIO;
+        private final Path location;
+        private final Table dataTable;
+
+        private ManifestsSplit(FileIO fileIO, Path location, Table dataTable) {
+            this.fileIO = fileIO;
+            this.location = location;
+            this.dataTable = dataTable;
+        }
+
+        @Override
+        public long rowCount() {
+            return new StatsManifestsGetter(fileIO, location, 
dataTable).manifestFileMetas().size();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ManifestsSplit that = (ManifestsSplit) o;
+            return Objects.equals(location, that.location);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(location);
+        }
+    }
+
+    private static class ManifestsRead implements InnerTableRead {
+
+        private int[][] projection;
+
+        private FileIO fileIO;
+
+        private Table dataTable;
+
+        public ManifestsRead(FileIO fileIO, Table dataTable) {
+            this.fileIO = fileIO;
+            this.dataTable = dataTable;
+        }
+
+        @Override
+        public InnerTableRead withFilter(Predicate predicate) {
+            // TODO
+            return this;
+        }
+
+        @Override
+        public InnerTableRead withProjection(int[][] projection) {
+            this.projection = projection;
+            return this;
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            if (!(split instanceof ManifestsSplit)) {
+                throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
+            }
+            Path location = ((ManifestsSplit) split).location;
+            Snapshot snapshot = new SnapshotManager(fileIO, 
location).latestSnapshot();
+            List<ManifestFileMeta> manifestFileMetas =
+                    new StatsManifestsGetter(fileIO, location, 
dataTable).manifestFileMetas();
+
+            Iterator<InternalRow> rows =
+                    Iterators.transform(manifestFileMetas.iterator(), 
this::toRow);
+            if (projection != null) {
+                rows =
+                        Iterators.transform(
+                                rows, row -> 
ProjectedRow.from(projection).replaceRow(row));
+            }
+            return new IteratorRecordReader<>(rows);
+        }
+
+        private InternalRow toRow(ManifestFileMeta manifestFileMeta) {
+            return GenericRow.of(
+                    BinaryString.fromString(manifestFileMeta.fileName()),
+                    manifestFileMeta.fileSize(),
+                    manifestFileMeta.numAddedFiles(),
+                    manifestFileMeta.numDeletedFiles(),
+                    manifestFileMeta.schemaId());
+        }
+    }
+
+    private static class StatsManifestsGetter {
+        private final FileIO fileIO;
+        private final Table dataTable;
+        private final Path location;
+
+        private List<ManifestFileMeta> manifestFileMetas;
+
+        private StatsManifestsGetter(FileIO fileIO, Path location, Table 
dataTable) {
+            this.fileIO = fileIO;
+            this.location = location;
+            this.dataTable = dataTable;
+        }
+
+        private void initialize() {
+            Snapshot snapshot = new SnapshotManager(fileIO, 
location).latestSnapshot();
+            FileStorePathFactory fileStorePathFactory = new 
FileStorePathFactory(location);
+            CoreOptions coreOptions = CoreOptions.fromMap(dataTable.options());
+            FileFormat fileFormat = coreOptions.manifestFormat();
+            ManifestList manifestList =
+                    new ManifestList.Factory(fileIO, fileFormat, 
fileStorePathFactory, null)
+                            .create();
+            manifestFileMetas = snapshot.allManifests(manifestList);
+        }
+
+        private List<ManifestFileMeta> manifestFileMetas() {
+            if (manifestFileMetas == null) {
+                initialize();
+            }
+            return manifestFileMetas;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 55d684208..2bfc39fad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
 import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
 import static org.apache.paimon.table.system.FilesTable.FILES;
+import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
 import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
@@ -40,6 +41,8 @@ public class SystemTableLoader {
     public static Table load(String type, FileIO fileIO, FileStoreTable 
dataTable) {
         Path location = dataTable.location();
         switch (type.toLowerCase()) {
+            case MANIFESTS:
+                return new ManifestsTable(fileIO, location, dataTable);
             case SNAPSHOTS:
                 return new SnapshotsTable(fileIO, location);
             case OPTIONS:
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 95b36fd70..746d097ab 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -98,6 +98,32 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                                 + "'Identifier{database='default', 
table='T$aa$bb'}', please use data table.");
     }
 
+    @Test
+    public void testManifestsTable() throws Exception {
+        sql("CREATE TABLE T (a INT, b INT)");
+        sql("INSERT INTO T VALUES (1, 2)");
+
+        List<Row> result = sql("SELECT schema_id, file_name, file_size FROM 
T$manifests");
+
+        result.forEach(
+                row -> {
+                    assertThat((long) row.getField(0)).isEqualTo(0L);
+                    assertThat(StringUtils.startsWith((String) 
row.getField(1), "manifest"))
+                            .isTrue();
+                    assertThat((long) row.getField(2)).isGreaterThan(0L);
+                });
+    }
+
+    @Test
+    public void testManifestsTableWithFileCount() {
+        sql("CREATE TABLE T (a INT, b INT)");
+        sql("INSERT INTO T VALUES (1, 2)");
+        sql("INSERT INTO T VALUES (3, 4)");
+
+        List<Row> result = sql("SELECT num_added_files, num_deleted_files FROM 
T$manifests");
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1L, 0L), 
Row.of(1L, 0L));
+    }
+
     @Test
     public void testSchemasTable() throws Exception {
         sql(
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 3d51e3933..05f54e86b 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -112,6 +112,30 @@ public class SparkReadITCase extends SparkReadTestBase {
         assertThat(rows.toString()).isEqualTo("[[1,2,2,0]]");
     }
 
+    @Test
+    public void testManifestsTable() {
+        List<Row> rows =
+                spark.table("`t1$manifests`")
+                        .select("schema_id", "file_name", "file_size")
+                        .collectAsList();
+        Long schemaId = rows.get(0).getLong(0);
+        String fileName = rows.get(0).getString(1);
+        Long fileSize = rows.get(0).getLong(2);
+
+        assertThat(schemaId).isEqualTo(0L);
+        assertThat(fileName).startsWith("manifest");
+        assertThat(fileSize).isGreaterThan(0L);
+    }
+
+    @Test
+    public void testManifestsTableWithRecordCount() {
+        List<Row> rows =
+                spark.table("`t1$manifests`")
+                        .select("num_added_files", "num_deleted_files")
+                        .collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[1,0]]");
+    }
+
     @Test
     public void testCatalogFilterPushDown() {
         innerTestSimpleTypeFilterPushDown(spark.table("t1"));

Reply via email to