This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6f317a528 [core] support count on files and manifest list name (#1774)
6f317a528 is described below
commit 6f317a5288576c773e237f8a0cb3ebc105c0f0c1
Author: GuojunLi <[email protected]>
AuthorDate: Thu Aug 10 11:52:02 2023 +0800
[core] support count on files and manifest list name (#1774)
* [core] support count on files and manifest list name
* [core] Fix checkstyle
* [core] Fix checkstyle of java doc
---
docs/content/how-to/system-tables.md | 12 +-
.../apache/paimon/table/system/SnapshotsTable.java | 51 ++++++--
.../paimon/table/system/SystemTableLoader.java | 2 +-
.../paimon/table/system/SnapshotsTableTest.java | 131 +++++++++++++++++++++
4 files changed, 179 insertions(+), 17 deletions(-)
diff --git a/docs/content/how-to/system-tables.md
b/docs/content/how-to/system-tables.md
index 5db5ff9f8..7f25accd7 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -45,12 +45,12 @@ You can query the snapshot history information of the table
through snapshots ta
SELECT * FROM MyTable$snapshots;
/*
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+----------------+
-| snapshot_id | schema_id | commit_user | commit_identifier |
commit_kind | commit_time | total_record_count |
delta_record_count | changelog_record_count | watermark |
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+----------------+
-| 2 | 0 | 7ca4cd28-98e... | 2 |
APPEND | 2022-10-26 11:44:15.600 | 2 | 2 |
0 | 1666755855600 |
-| 1 | 0 | 870062aa-3e9... | 1 |
APPEND | 2022-10-26 11:44:15.148 | 1 | 1 |
0 | 1666755855148 |
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+----------------+
++--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------
+--------------------------------+---------------------+---------------------+-------------------------+-------------------+--------------------+----------------+
+| snapshot_id | schema_id | commit_user | commit_identifier |
commit_kind | commit_time | base_manifest_list |
delta_manifest_list | changelog_manifest_list | total_record_count
| delta_record_count | changelog_record_count | added_file_count |
delete_file_count | watermark |
++--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------
+--------------------------------+---------------------+---------------------+-------------------------+-------------------+--------------------+----------------+
+| 2 | 0 | 7ca4cd28-98e... | 2 |
APPEND | 2022-10-26 11:44:15.600 | manifest-list-31323d5f-76e6... |
manifest-list-31323d5f-76e6... | manifest-list-31323d5f-76e6... |
2 | 2 | 0 | 2 |
0 | 1666755855600 |
+| 1 | 0 | 870062aa-3e9... | 1 |
APPEND | 2022-10-26 11:44:15.148 | manifest-list-31593d5f-76e6... |
manifest-list-31593d5f-76e6... | manifest-list-31593d5f-76e6... |
1 | 1 | 0 | 1 |
0 | 1666755855148 |
++--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------
+--------------------------------+---------------------+---------------------+-------------------------+-------------------+--------------------+----------------+
2 rows in set
*/
```
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 0a73bbebd..944c8e605 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -26,8 +26,11 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
@@ -37,6 +40,7 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.IteratorRecordReader;
@@ -77,17 +81,34 @@ public class SnapshotsTable implements ReadonlyTable {
new DataField(
4, "commit_kind",
SerializationUtils.newStringType(false)),
new DataField(5, "commit_time", new
TimestampType(false, 3)),
- new DataField(6, "total_record_count", new
BigIntType(true)),
- new DataField(7, "delta_record_count", new
BigIntType(true)),
- new DataField(8, "changelog_record_count", new
BigIntType(true)),
- new DataField(9, "watermark", new
BigIntType(true))));
+ new DataField(
+ 6,
+ "base_manifest_list",
+ SerializationUtils.newStringType(false)),
+ new DataField(
+ 7,
+ "delta_manifest_list",
+ SerializationUtils.newStringType(false)),
+ new DataField(
+ 8,
+ "changelog_manifest_list",
+ SerializationUtils.newStringType(true)),
+ new DataField(9, "total_record_count", new
BigIntType(true)),
+ new DataField(10, "delta_record_count", new
BigIntType(true)),
+ new DataField(11, "changelog_record_count", new
BigIntType(true)),
+ new DataField(12, "added_file_count", new
IntType(true)),
+ new DataField(13, "delete_file_count", new
IntType(true)),
+ new DataField(14, "watermark", new
BigIntType(true))));
private final FileIO fileIO;
private final Path location;
- public SnapshotsTable(FileIO fileIO, Path location) {
+ private final FileStoreTable dataTable;
+
+ public SnapshotsTable(FileIO fileIO, Path location, FileStoreTable
dataTable) {
this.fileIO = fileIO;
this.location = location;
+ this.dataTable = dataTable;
}
@Override
@@ -112,12 +133,12 @@ public class SnapshotsTable implements ReadonlyTable {
@Override
public InnerTableRead newRead() {
- return new SnapshotsRead(fileIO);
+ return new SnapshotsRead(fileIO, dataTable);
}
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new SnapshotsTable(fileIO, location);
+ return new SnapshotsTable(fileIO, location,
dataTable.copy(dynamicOptions));
}
private class SnapshotsScan extends ReadOnceTableScan {
@@ -178,8 +199,11 @@ public class SnapshotsTable implements ReadonlyTable {
private final FileIO fileIO;
private int[][] projection;
- public SnapshotsRead(FileIO fileIO) {
+ private final FileStoreTable dataTable;
+
+ public SnapshotsRead(FileIO fileIO, FileStoreTable dataTable) {
this.fileIO = fileIO;
+ this.dataTable = dataTable;
}
@Override
@@ -206,7 +230,8 @@ public class SnapshotsTable implements ReadonlyTable {
}
Path location = ((SnapshotsSplit) split).location;
Iterator<Snapshot> snapshots = new SnapshotManager(fileIO,
location).snapshots();
- Iterator<InternalRow> rows = Iterators.transform(snapshots,
this::toRow);
+ Iterator<InternalRow> rows =
+ Iterators.transform(snapshots, snapshot -> toRow(snapshot,
dataTable));
if (projection != null) {
rows =
Iterators.transform(
@@ -215,7 +240,8 @@ public class SnapshotsTable implements ReadonlyTable {
return new IteratorRecordReader<>(rows);
}
- private InternalRow toRow(Snapshot snapshot) {
+ private InternalRow toRow(Snapshot snapshot, FileStoreTable dataTable)
{
+ FileStoreScan.Plan plan =
dataTable.store().newScan().withSnapshot(snapshot).plan();
return GenericRow.of(
snapshot.id(),
snapshot.schemaId(),
@@ -226,9 +252,14 @@ public class SnapshotsTable implements ReadonlyTable {
LocalDateTime.ofInstant(
Instant.ofEpochMilli(snapshot.timeMillis()),
ZoneId.systemDefault())),
+ BinaryString.fromString(snapshot.baseManifestList()),
+ BinaryString.fromString(snapshot.deltaManifestList()),
+ BinaryString.fromString(snapshot.changelogManifestList()),
snapshot.totalRecordCount(),
snapshot.deltaRecordCount(),
snapshot.changelogRecordCount(),
+ plan.files(FileKind.ADD).size(),
+ plan.files(FileKind.DELETE).size(),
snapshot.watermark());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 49e99e696..3d7daea9c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -48,7 +48,7 @@ public class SystemTableLoader {
case MANIFESTS:
return new ManifestsTable(fileIO, location, dataTable);
case SNAPSHOTS:
- return new SnapshotsTable(fileIO, location);
+ return new SnapshotsTable(fileIO, location, dataTable);
case OPTIONS:
return new OptionsTable(fileIO, location);
case SCHEMAS:
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
new file mode 100644
index 000000000..4bb5f462e
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SnapshotsTable}. */
+public class SnapshotsTableTest extends TableTestBase {
+ private static final String tableName = "MyTable";
+
+ private FileStoreTable table;
+ private FileStoreScan scan;
+ private SnapshotsTable snapshotsTable;
+ private SnapshotManager snapshotManager;
+
+ @BeforeEach
+ public void before() throws Exception {
+ FileIO fileIO = LocalFileIO.create();
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, tableName));
+ Schema schema =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .option(CoreOptions.BUCKET.key(), "2")
+ .build();
+ snapshotManager = new SnapshotManager(fileIO, tablePath);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath),
schema);
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
+ scan = table.store().newScan();
+
+ Identifier filesTableId =
+ identifier(tableName + Catalog.SYSTEM_TABLE_SPLITTER +
SnapshotsTable.SNAPSHOTS);
+ snapshotsTable = (SnapshotsTable) catalog.getTable(filesTableId);
+
+ // snapshot 1: append
+ write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 5));
+
+ // snapshot 2: append
+ write(table, GenericRow.of(2, 1, 3), GenericRow.of(2, 2, 4));
+ }
+
+ @Test
+ public void testReadSnapshotsFromLatest() throws Exception {
+ List<InternalRow> expectedRow = getExceptedResult(new long[] {1, 2});
+ List<InternalRow> result = read(snapshotsTable);
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
+ }
+
+ private List<InternalRow> getExceptedResult(long[] snapshotIds) {
+ List<InternalRow> expectedRow = new ArrayList<>();
+ for (long snapshotId : snapshotIds) {
+ FileStoreScan.Plan plan = scan.withSnapshot(snapshotId).plan();
+ Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+ expectedRow.add(
+ GenericRow.of(
+ snapshotId,
+ snapshot.schemaId(),
+ BinaryString.fromString(snapshot.commitUser()),
+ snapshot.commitIdentifier(),
+
BinaryString.fromString(snapshot.commitKind().toString()),
+ Timestamp.fromLocalDateTime(
+ LocalDateTime.ofInstant(
+
Instant.ofEpochMilli(snapshot.timeMillis()),
+ ZoneId.systemDefault())),
+
BinaryString.fromString(snapshot.baseManifestList()),
+
BinaryString.fromString(snapshot.deltaManifestList()),
+
BinaryString.fromString(snapshot.changelogManifestList()),
+ snapshot.totalRecordCount(),
+ snapshot.deltaRecordCount(),
+ snapshot.changelogRecordCount(),
+ plan.files(FileKind.ADD).size(),
+ plan.files(FileKind.DELETE).size(),
+ snapshot.watermark()));
+ }
+
+ return expectedRow;
+ }
+}