This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 36d0d9231 [core] Add latestSnapshotId to Table (#3771)
36d0d9231 is described below

commit 36d0d92316ee2e5a20cd9da3d036fcf5191880cc
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 18 19:05:55 2024 +0800

    [core] Add latestSnapshotId to Table (#3771)
---
 .../paimon/privilege/PrivilegedFileStoreTable.java |  6 +++
 .../paimon/table/AbstractFileStoreTable.java       |  7 +++
 .../org/apache/paimon/table/ReadonlyTable.java     |  9 ++++
 .../main/java/org/apache/paimon/table/Table.java   |  5 ++
 .../apache/paimon/table/system/AuditLogTable.java  | 44 ++++++++++--------
 .../apache/paimon/table/system/BucketsTable.java   |  6 +++
 .../paimon/table/system/FileMonitorTable.java      |  6 +++
 .../paimon/table/system/ReadOptimizedTable.java    | 53 ++++++++++++----------
 8 files changed, 93 insertions(+), 43 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index b0c3574c0..45ce6f0bf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -47,6 +47,7 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 /** {@link FileStoreTable} with privilege checks. */
 public class PrivilegedFileStoreTable implements FileStoreTable {
@@ -143,6 +144,11 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
                 wrapped.copy(dynamicOptions), privilegeChecker, identifier);
     }
 
+    @Override
+    public OptionalLong latestSnapshotId() {
+        return wrapped.latestSnapshotId();
+    }
+
     @Override
     public FileStoreTable copy(TableSchema newTableSchema) {
         return new PrivilegedFileStoreTable(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 86672dda8..0171ff677 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -72,6 +72,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.SortedMap;
 import java.util.function.BiConsumer;
 
@@ -107,6 +108,12 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         this.catalogEnvironment = catalogEnvironment;
     }
 
+    @Override
+    public OptionalLong latestSnapshotId() {
+        Long snapshot = store().snapshotManager().latestSnapshotId();
+        return snapshot == null ? OptionalLong.empty() : 
OptionalLong.of(snapshot);
+    }
+
     @Override
     public String name() {
         Identifier identifier = catalogEnvironment.identifier();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 452b6b61c..b9eeba398 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -31,6 +31,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 /** Readonly table which only provide implementation for scan and read. */
 public interface ReadonlyTable extends InnerTable {
@@ -103,6 +104,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default OptionalLong latestSnapshotId() {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support currentSnapshot.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void rollbackTo(long snapshotId) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 0e23ec607..708e25c1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -31,6 +31,7 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 /**
  * A table provides basic abstraction for table type and table scan and table 
read.
@@ -73,6 +74,10 @@ public interface Table extends Serializable {
     /** Copy this table with adding dynamic options. */
     Table copy(Map<String, String> dynamicOptions);
 
+    /** Get the latest snapshot id for this table, or empty if there are no 
snapshots. */
+    @Experimental
+    OptionalLong latestSnapshotId();
+
     /** Rollback table's state to a specific snapshot. */
     @Experimental
     void rollbackTo(long snapshotId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 6a67f696b..0b922d77b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -69,6 +69,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -94,33 +95,38 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
                                 p.literals()));
             };
 
-    private final FileStoreTable dataTable;
+    private final FileStoreTable wrapped;
 
-    public AuditLogTable(FileStoreTable dataTable) {
-        this.dataTable = dataTable;
+    public AuditLogTable(FileStoreTable wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public OptionalLong latestSnapshotId() {
+        return wrapped.latestSnapshotId();
     }
 
     @Override
     public String name() {
-        return dataTable.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
+        return wrapped.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
     }
 
     @Override
     public RowType rowType() {
         List<DataField> fields = new ArrayList<>();
         fields.add(new DataField(0, ROW_KIND, new 
VarCharType(VarCharType.MAX_LENGTH)));
-        fields.addAll(dataTable.rowType().getFields());
+        fields.addAll(wrapped.rowType().getFields());
         return new RowType(fields);
     }
 
     @Override
     public List<String> partitionKeys() {
-        return dataTable.partitionKeys();
+        return wrapped.partitionKeys();
     }
 
     @Override
     public Map<String, String> options() {
-        return dataTable.options();
+        return wrapped.options();
     }
 
     @Override
@@ -130,57 +136,57 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public SnapshotReader newSnapshotReader() {
-        return new AuditLogDataReader(dataTable.newSnapshotReader());
+        return new AuditLogDataReader(wrapped.newSnapshotReader());
     }
 
     @Override
     public DataTableScan newScan() {
-        return new AuditLogBatchScan(dataTable.newScan());
+        return new AuditLogBatchScan(wrapped.newScan());
     }
 
     @Override
     public StreamDataTableScan newStreamScan() {
-        return new AuditLogStreamScan(dataTable.newStreamScan());
+        return new AuditLogStreamScan(wrapped.newStreamScan());
     }
 
     @Override
     public CoreOptions coreOptions() {
-        return dataTable.coreOptions();
+        return wrapped.coreOptions();
     }
 
     @Override
     public Path location() {
-        return dataTable.location();
+        return wrapped.location();
     }
 
     @Override
     public SnapshotManager snapshotManager() {
-        return dataTable.snapshotManager();
+        return wrapped.snapshotManager();
     }
 
     @Override
     public TagManager tagManager() {
-        return dataTable.tagManager();
+        return wrapped.tagManager();
     }
 
     @Override
     public BranchManager branchManager() {
-        return dataTable.branchManager();
+        return wrapped.branchManager();
     }
 
     @Override
     public InnerTableRead newRead() {
-        return new AuditLogRead(dataTable.newRead());
+        return new AuditLogRead(wrapped.newRead());
     }
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new AuditLogTable(dataTable.copy(dynamicOptions));
+        return new AuditLogTable(wrapped.copy(dynamicOptions));
     }
 
     @Override
     public FileIO fileIO() {
-        return dataTable.fileIO();
+        return wrapped.fileIO();
     }
 
     /** Push down predicate to dataScan and dataRead. */
@@ -463,7 +469,7 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
         /** Default projection, just add row kind to the first. */
         private int[] defaultProjection() {
-            int dataFieldCount = dataTable.rowType().getFieldCount();
+            int dataFieldCount = wrapped.rowType().getFieldCount();
             int[] projection = new int[dataFieldCount + 1];
             projection[0] = -1;
             for (int i = 0; i < dataFieldCount; i++) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 7712b1ca2..7f1e8fb0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -55,6 +55,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.utils.SerializationUtils.newBytesType;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
@@ -103,6 +104,11 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
         this.databaseName = databaseName;
     }
 
+    @Override
+    public OptionalLong latestSnapshotId() {
+        return wrapped.latestSnapshotId();
+    }
+
     @Override
     public Path location() {
         return wrapped.location();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 58f4bc56a..8eb97130e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -54,6 +54,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
 import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
@@ -90,6 +91,11 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
         this.wrapped = wrapped.copy(dynamicOptions);
     }
 
+    @Override
+    public OptionalLong latestSnapshotId() {
+        return wrapped.latestSnapshotId();
+    }
+
     @Override
     public Path location() {
         return wrapped.location();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 4da8524c8..6be556f0b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -38,6 +38,7 @@ import org.apache.paimon.utils.TagManager;
 
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
@@ -54,55 +55,59 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
 
     public static final String READ_OPTIMIZED = "ro";
 
-    private final FileStoreTable dataTable;
+    private final FileStoreTable wrapped;
 
-    public ReadOptimizedTable(FileStoreTable dataTable) {
-        this.dataTable = dataTable;
+    public ReadOptimizedTable(FileStoreTable wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public OptionalLong latestSnapshotId() {
+        return wrapped.latestSnapshotId();
     }
 
     @Override
     public String name() {
-        return dataTable.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
+        return wrapped.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
     }
 
     @Override
     public RowType rowType() {
-        return dataTable.rowType();
+        return wrapped.rowType();
     }
 
     @Override
     public List<String> partitionKeys() {
-        return dataTable.partitionKeys();
+        return wrapped.partitionKeys();
     }
 
     @Override
     public Map<String, String> options() {
-        return dataTable.options();
+        return wrapped.options();
     }
 
     @Override
     public List<String> primaryKeys() {
-        return dataTable.primaryKeys();
+        return wrapped.primaryKeys();
     }
 
     @Override
     public SnapshotReader newSnapshotReader() {
-        if (dataTable.schema().primaryKeys().size() > 0) {
-            return dataTable
-                    .newSnapshotReader()
+        if (wrapped.schema().primaryKeys().size() > 0) {
+            return wrapped.newSnapshotReader()
                     .withLevelFilter(level -> level == 
coreOptions().numLevels() - 1);
         } else {
-            return dataTable.newSnapshotReader();
+            return wrapped.newSnapshotReader();
         }
     }
 
     @Override
     public DataTableBatchScan newScan() {
         return new DataTableBatchScan(
-                dataTable.schema().primaryKeys().size() > 0,
+                wrapped.schema().primaryKeys().size() > 0,
                 coreOptions(),
                 newSnapshotReader(),
-                DefaultValueAssigner.create(dataTable.schema()));
+                DefaultValueAssigner.create(wrapped.schema()));
     }
 
     @Override
@@ -111,47 +116,47 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
                 coreOptions(),
                 newSnapshotReader(),
                 snapshotManager(),
-                dataTable.supportStreamingReadOverwrite(),
-                DefaultValueAssigner.create(dataTable.schema()));
+                wrapped.supportStreamingReadOverwrite(),
+                DefaultValueAssigner.create(wrapped.schema()));
     }
 
     @Override
     public CoreOptions coreOptions() {
-        return dataTable.coreOptions();
+        return wrapped.coreOptions();
     }
 
     @Override
     public Path location() {
-        return dataTable.location();
+        return wrapped.location();
     }
 
     @Override
     public SnapshotManager snapshotManager() {
-        return dataTable.snapshotManager();
+        return wrapped.snapshotManager();
     }
 
     @Override
     public TagManager tagManager() {
-        return dataTable.tagManager();
+        return wrapped.tagManager();
     }
 
     @Override
     public BranchManager branchManager() {
-        return dataTable.branchManager();
+        return wrapped.branchManager();
     }
 
     @Override
     public InnerTableRead newRead() {
-        return dataTable.newRead();
+        return wrapped.newRead();
     }
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new ReadOptimizedTable(dataTable.copy(dynamicOptions));
+        return new ReadOptimizedTable(wrapped.copy(dynamicOptions));
     }
 
     @Override
     public FileIO fileIO() {
-        return dataTable.fileIO();
+        return wrapped.fileIO();
     }
 }

Reply via email to