This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f317a528 [core] support count on files and manifest list name (#1774)
6f317a528 is described below

commit 6f317a5288576c773e237f8a0cb3ebc105c0f0c1
Author: GuojunLi <[email protected]>
AuthorDate: Thu Aug 10 11:52:02 2023 +0800

    [core] support count on files and manifest list name (#1774)
    
    * [core] support count on files and manifest list name
    
    * [core] Fix checkstyle
    
    * [core] Fix checkstyle of java doc
---
 docs/content/how-to/system-tables.md               |  12 +-
 .../apache/paimon/table/system/SnapshotsTable.java |  51 ++++++--
 .../paimon/table/system/SystemTableLoader.java     |   2 +-
 .../paimon/table/system/SnapshotsTableTest.java    | 131 +++++++++++++++++++++
 4 files changed, 179 insertions(+), 17 deletions(-)

diff --git a/docs/content/how-to/system-tables.md 
b/docs/content/how-to/system-tables.md
index 5db5ff9f8..7f25accd7 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -45,12 +45,12 @@ You can query the snapshot history information of the table 
through snapshots ta
 SELECT * FROM MyTable$snapshots;
 
 /*
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+----------------+
-|  snapshot_id |  schema_id |     commit_user | commit_identifier |  
commit_kind |             commit_time |  total_record_count |  
delta_record_count |  changelog_record_count |      watermark |
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+----------------+
-|            2 |          0 | 7ca4cd28-98e... |                 2 |       
APPEND | 2022-10-26 11:44:15.600 |                   2 |                   2 |  
                     0 |  1666755855600 |
-|            1 |          0 | 870062aa-3e9... |                 1 |       
APPEND | 2022-10-26 11:44:15.148 |                   1 |                   1 |  
                     0 |  1666755855148 |
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+----------------+
++--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------
 
+--------------------------------+---------------------+---------------------+-------------------------+-------------------+--------------------+----------------+
+|  snapshot_id |  schema_id |     commit_user | commit_identifier |  
commit_kind |             commit_time |             base_manifest_list |        
    delta_manifest_list |        changelog_manifest_list |  total_record_count 
|  delta_record_count |  changelog_record_count |  added_file_count |  
delete_file_count |      watermark |
++--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------
 
+--------------------------------+---------------------+---------------------+-------------------------+-------------------+--------------------+----------------+
+|            2 |          0 | 7ca4cd28-98e... |                 2 |       
APPEND | 2022-10-26 11:44:15.600 | manifest-list-31323d5f-76e6... | 
manifest-list-31323d5f-76e6... | manifest-list-31323d5f-76e6... |               
    2 |                   2 |                       0 |                 2 |     
             0 |  1666755855600 |
+|            1 |          0 | 870062aa-3e9... |                 1 |       
APPEND | 2022-10-26 11:44:15.148 | manifest-list-31593d5f-76e6... | 
manifest-list-31593d5f-76e6... | manifest-list-31593d5f-76e6... |               
    1 |                   1 |                       0 |                 1 |     
             0 |  1666755855148 |
++--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------
 
+--------------------------------+---------------------+---------------------+-------------------------+-------------------+--------------------+----------------+
 2 rows in set
 */
 ```
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 0a73bbebd..944c8e605 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -26,8 +26,11 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
@@ -37,6 +40,7 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.IteratorRecordReader;
@@ -77,17 +81,34 @@ public class SnapshotsTable implements ReadonlyTable {
                             new DataField(
                                     4, "commit_kind", 
SerializationUtils.newStringType(false)),
                             new DataField(5, "commit_time", new 
TimestampType(false, 3)),
-                            new DataField(6, "total_record_count", new 
BigIntType(true)),
-                            new DataField(7, "delta_record_count", new 
BigIntType(true)),
-                            new DataField(8, "changelog_record_count", new 
BigIntType(true)),
-                            new DataField(9, "watermark", new 
BigIntType(true))));
+                            new DataField(
+                                    6,
+                                    "base_manifest_list",
+                                    SerializationUtils.newStringType(false)),
+                            new DataField(
+                                    7,
+                                    "delta_manifest_list",
+                                    SerializationUtils.newStringType(false)),
+                            new DataField(
+                                    8,
+                                    "changelog_manifest_list",
+                                    SerializationUtils.newStringType(true)),
+                            new DataField(9, "total_record_count", new 
BigIntType(true)),
+                            new DataField(10, "delta_record_count", new 
BigIntType(true)),
+                            new DataField(11, "changelog_record_count", new 
BigIntType(true)),
+                            new DataField(12, "added_file_count", new 
IntType(true)),
+                            new DataField(13, "delete_file_count", new 
IntType(true)),
+                            new DataField(14, "watermark", new 
BigIntType(true))));
 
     private final FileIO fileIO;
     private final Path location;
 
-    public SnapshotsTable(FileIO fileIO, Path location) {
+    private final FileStoreTable dataTable;
+
+    public SnapshotsTable(FileIO fileIO, Path location, FileStoreTable 
dataTable) {
         this.fileIO = fileIO;
         this.location = location;
+        this.dataTable = dataTable;
     }
 
     @Override
@@ -112,12 +133,12 @@ public class SnapshotsTable implements ReadonlyTable {
 
     @Override
     public InnerTableRead newRead() {
-        return new SnapshotsRead(fileIO);
+        return new SnapshotsRead(fileIO, dataTable);
     }
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new SnapshotsTable(fileIO, location);
+        return new SnapshotsTable(fileIO, location, 
dataTable.copy(dynamicOptions));
     }
 
     private class SnapshotsScan extends ReadOnceTableScan {
@@ -178,8 +199,11 @@ public class SnapshotsTable implements ReadonlyTable {
         private final FileIO fileIO;
         private int[][] projection;
 
-        public SnapshotsRead(FileIO fileIO) {
+        private final FileStoreTable dataTable;
+
+        public SnapshotsRead(FileIO fileIO, FileStoreTable dataTable) {
             this.fileIO = fileIO;
+            this.dataTable = dataTable;
         }
 
         @Override
@@ -206,7 +230,8 @@ public class SnapshotsTable implements ReadonlyTable {
             }
             Path location = ((SnapshotsSplit) split).location;
             Iterator<Snapshot> snapshots = new SnapshotManager(fileIO, 
location).snapshots();
-            Iterator<InternalRow> rows = Iterators.transform(snapshots, 
this::toRow);
+            Iterator<InternalRow> rows =
+                    Iterators.transform(snapshots, snapshot -> toRow(snapshot, 
dataTable));
             if (projection != null) {
                 rows =
                         Iterators.transform(
@@ -215,7 +240,8 @@ public class SnapshotsTable implements ReadonlyTable {
             return new IteratorRecordReader<>(rows);
         }
 
-        private InternalRow toRow(Snapshot snapshot) {
+        private InternalRow toRow(Snapshot snapshot, FileStoreTable dataTable) 
{
+            FileStoreScan.Plan plan = 
dataTable.store().newScan().withSnapshot(snapshot).plan();
             return GenericRow.of(
                     snapshot.id(),
                     snapshot.schemaId(),
@@ -226,9 +252,14 @@ public class SnapshotsTable implements ReadonlyTable {
                             LocalDateTime.ofInstant(
                                     
Instant.ofEpochMilli(snapshot.timeMillis()),
                                     ZoneId.systemDefault())),
+                    BinaryString.fromString(snapshot.baseManifestList()),
+                    BinaryString.fromString(snapshot.deltaManifestList()),
+                    BinaryString.fromString(snapshot.changelogManifestList()),
                     snapshot.totalRecordCount(),
                     snapshot.deltaRecordCount(),
                     snapshot.changelogRecordCount(),
+                    plan.files(FileKind.ADD).size(),
+                    plan.files(FileKind.DELETE).size(),
                     snapshot.watermark());
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 49e99e696..3d7daea9c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -48,7 +48,7 @@ public class SystemTableLoader {
             case MANIFESTS:
                 return new ManifestsTable(fileIO, location, dataTable);
             case SNAPSHOTS:
-                return new SnapshotsTable(fileIO, location);
+                return new SnapshotsTable(fileIO, location, dataTable);
             case OPTIONS:
                 return new OptionsTable(fileIO, location);
             case SCHEMAS:
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
new file mode 100644
index 000000000..4bb5f462e
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SnapshotsTable}. */
+public class SnapshotsTableTest extends TableTestBase {
+    private static final String tableName = "MyTable";
+
+    private FileStoreTable table;
+    private FileStoreScan scan;
+    private SnapshotsTable snapshotsTable;
+    private SnapshotManager snapshotManager;
+
+    @BeforeEach
+    public void before() throws Exception {
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+                        .option(CoreOptions.BUCKET.key(), "2")
+                        .build();
+        snapshotManager = new SnapshotManager(fileIO, tablePath);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
+        table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
+        scan = table.store().newScan();
+
+        Identifier filesTableId =
+                identifier(tableName + Catalog.SYSTEM_TABLE_SPLITTER + 
SnapshotsTable.SNAPSHOTS);
+        snapshotsTable = (SnapshotsTable) catalog.getTable(filesTableId);
+
+        // snapshot 1: append
+        write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 5));
+
+        // snapshot 2: append
+        write(table, GenericRow.of(2, 1, 3), GenericRow.of(2, 2, 4));
+    }
+
+    @Test
+    public void testReadSnapshotsFromLatest() throws Exception {
+        List<InternalRow> expectedRow = getExceptedResult(new long[] {1, 2});
+        List<InternalRow> result = read(snapshotsTable);
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
+    }
+
+    private List<InternalRow> getExceptedResult(long[] snapshotIds) {
+        List<InternalRow> expectedRow = new ArrayList<>();
+        for (long snapshotId : snapshotIds) {
+            FileStoreScan.Plan plan = scan.withSnapshot(snapshotId).plan();
+            Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+            expectedRow.add(
+                    GenericRow.of(
+                            snapshotId,
+                            snapshot.schemaId(),
+                            BinaryString.fromString(snapshot.commitUser()),
+                            snapshot.commitIdentifier(),
+                            
BinaryString.fromString(snapshot.commitKind().toString()),
+                            Timestamp.fromLocalDateTime(
+                                    LocalDateTime.ofInstant(
+                                            
Instant.ofEpochMilli(snapshot.timeMillis()),
+                                            ZoneId.systemDefault())),
+                            
BinaryString.fromString(snapshot.baseManifestList()),
+                            
BinaryString.fromString(snapshot.deltaManifestList()),
+                            
BinaryString.fromString(snapshot.changelogManifestList()),
+                            snapshot.totalRecordCount(),
+                            snapshot.deltaRecordCount(),
+                            snapshot.changelogRecordCount(),
+                            plan.files(FileKind.ADD).size(),
+                            plan.files(FileKind.DELETE).size(),
+                            snapshot.watermark()));
+        }
+
+        return expectedRow;
+    }
+}

Reply via email to