This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 36d0d9231 [core] Add latestSnapshotId to Table (#3771)
36d0d9231 is described below
commit 36d0d92316ee2e5a20cd9da3d036fcf5191880cc
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 18 19:05:55 2024 +0800
[core] Add latestSnapshotId to Table (#3771)
---
.../paimon/privilege/PrivilegedFileStoreTable.java | 6 +++
.../paimon/table/AbstractFileStoreTable.java | 7 +++
.../org/apache/paimon/table/ReadonlyTable.java | 9 ++++
.../main/java/org/apache/paimon/table/Table.java | 5 ++
.../apache/paimon/table/system/AuditLogTable.java | 44 ++++++++++--------
.../apache/paimon/table/system/BucketsTable.java | 6 +++
.../paimon/table/system/FileMonitorTable.java | 6 +++
.../paimon/table/system/ReadOptimizedTable.java | 53 ++++++++++++----------
8 files changed, 93 insertions(+), 43 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index b0c3574c0..45ce6f0bf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -47,6 +47,7 @@ import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
/** {@link FileStoreTable} with privilege checks. */
public class PrivilegedFileStoreTable implements FileStoreTable {
@@ -143,6 +144,11 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
wrapped.copy(dynamicOptions), privilegeChecker, identifier);
}
+ @Override
+ public OptionalLong latestSnapshotId() {
+ return wrapped.latestSnapshotId();
+ }
+
@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new PrivilegedFileStoreTable(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 86672dda8..0171ff677 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -72,6 +72,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.SortedMap;
import java.util.function.BiConsumer;
@@ -107,6 +108,12 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
this.catalogEnvironment = catalogEnvironment;
}
+ @Override
+ public OptionalLong latestSnapshotId() {
+ Long snapshot = store().snapshotManager().latestSnapshotId();
+ return snapshot == null ? OptionalLong.empty() :
OptionalLong.of(snapshot);
+ }
+
@Override
public String name() {
Identifier identifier = catalogEnvironment.identifier();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 452b6b61c..b9eeba398 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -31,6 +31,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
/** Readonly table which only provide implementation for scan and read. */
public interface ReadonlyTable extends InnerTable {
@@ -103,6 +104,14 @@ public interface ReadonlyTable extends InnerTable {
this.getClass().getSimpleName()));
}
+ @Override
+ default OptionalLong latestSnapshotId() {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support currentSnapshot.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default void rollbackTo(long snapshotId) {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 0e23ec607..708e25c1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -31,6 +31,7 @@ import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
/**
* A table provides basic abstraction for table type and table scan and table
read.
@@ -73,6 +74,10 @@ public interface Table extends Serializable {
/** Copy this table with adding dynamic options. */
Table copy(Map<String, String> dynamicOptions);
+ /** Get the latest snapshot id for this table, or empty if there are no
snapshots. */
+ @Experimental
+ OptionalLong latestSnapshotId();
+
/** Rollback table's state to a specific snapshot. */
@Experimental
void rollbackTo(long snapshotId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 6a67f696b..0b922d77b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -69,6 +69,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.stream.Collectors;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -94,33 +95,38 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
p.literals()));
};
- private final FileStoreTable dataTable;
+ private final FileStoreTable wrapped;
- public AuditLogTable(FileStoreTable dataTable) {
- this.dataTable = dataTable;
+ public AuditLogTable(FileStoreTable wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public OptionalLong latestSnapshotId() {
+ return wrapped.latestSnapshotId();
}
@Override
public String name() {
- return dataTable.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
+ return wrapped.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
}
@Override
public RowType rowType() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, ROW_KIND, new
VarCharType(VarCharType.MAX_LENGTH)));
- fields.addAll(dataTable.rowType().getFields());
+ fields.addAll(wrapped.rowType().getFields());
return new RowType(fields);
}
@Override
public List<String> partitionKeys() {
- return dataTable.partitionKeys();
+ return wrapped.partitionKeys();
}
@Override
public Map<String, String> options() {
- return dataTable.options();
+ return wrapped.options();
}
@Override
@@ -130,57 +136,57 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
@Override
public SnapshotReader newSnapshotReader() {
- return new AuditLogDataReader(dataTable.newSnapshotReader());
+ return new AuditLogDataReader(wrapped.newSnapshotReader());
}
@Override
public DataTableScan newScan() {
- return new AuditLogBatchScan(dataTable.newScan());
+ return new AuditLogBatchScan(wrapped.newScan());
}
@Override
public StreamDataTableScan newStreamScan() {
- return new AuditLogStreamScan(dataTable.newStreamScan());
+ return new AuditLogStreamScan(wrapped.newStreamScan());
}
@Override
public CoreOptions coreOptions() {
- return dataTable.coreOptions();
+ return wrapped.coreOptions();
}
@Override
public Path location() {
- return dataTable.location();
+ return wrapped.location();
}
@Override
public SnapshotManager snapshotManager() {
- return dataTable.snapshotManager();
+ return wrapped.snapshotManager();
}
@Override
public TagManager tagManager() {
- return dataTable.tagManager();
+ return wrapped.tagManager();
}
@Override
public BranchManager branchManager() {
- return dataTable.branchManager();
+ return wrapped.branchManager();
}
@Override
public InnerTableRead newRead() {
- return new AuditLogRead(dataTable.newRead());
+ return new AuditLogRead(wrapped.newRead());
}
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new AuditLogTable(dataTable.copy(dynamicOptions));
+ return new AuditLogTable(wrapped.copy(dynamicOptions));
}
@Override
public FileIO fileIO() {
- return dataTable.fileIO();
+ return wrapped.fileIO();
}
/** Push down predicate to dataScan and dataRead. */
@@ -463,7 +469,7 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
/** Default projection, just add row kind to the first. */
private int[] defaultProjection() {
- int dataFieldCount = dataTable.rowType().getFieldCount();
+ int dataFieldCount = wrapped.rowType().getFieldCount();
int[] projection = new int[dataFieldCount + 1];
projection[0] = -1;
for (int i = 0; i < dataFieldCount; i++) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 7712b1ca2..7f1e8fb0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -55,6 +55,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
@@ -103,6 +104,11 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
this.databaseName = databaseName;
}
+ @Override
+ public OptionalLong latestSnapshotId() {
+ return wrapped.latestSnapshotId();
+ }
+
@Override
public Path location() {
return wrapped.location();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 58f4bc56a..8eb97130e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -54,6 +54,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
@@ -90,6 +91,11 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
this.wrapped = wrapped.copy(dynamicOptions);
}
+ @Override
+ public OptionalLong latestSnapshotId() {
+ return wrapped.latestSnapshotId();
+ }
+
@Override
public Path location() {
return wrapped.location();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 4da8524c8..6be556f0b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -38,6 +38,7 @@ import org.apache.paimon.utils.TagManager;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -54,55 +55,59 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
public static final String READ_OPTIMIZED = "ro";
- private final FileStoreTable dataTable;
+ private final FileStoreTable wrapped;
- public ReadOptimizedTable(FileStoreTable dataTable) {
- this.dataTable = dataTable;
+ public ReadOptimizedTable(FileStoreTable wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public OptionalLong latestSnapshotId() {
+ return wrapped.latestSnapshotId();
}
@Override
public String name() {
- return dataTable.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
+ return wrapped.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
}
@Override
public RowType rowType() {
- return dataTable.rowType();
+ return wrapped.rowType();
}
@Override
public List<String> partitionKeys() {
- return dataTable.partitionKeys();
+ return wrapped.partitionKeys();
}
@Override
public Map<String, String> options() {
- return dataTable.options();
+ return wrapped.options();
}
@Override
public List<String> primaryKeys() {
- return dataTable.primaryKeys();
+ return wrapped.primaryKeys();
}
@Override
public SnapshotReader newSnapshotReader() {
- if (dataTable.schema().primaryKeys().size() > 0) {
- return dataTable
- .newSnapshotReader()
+ if (wrapped.schema().primaryKeys().size() > 0) {
+ return wrapped.newSnapshotReader()
.withLevelFilter(level -> level ==
coreOptions().numLevels() - 1);
} else {
- return dataTable.newSnapshotReader();
+ return wrapped.newSnapshotReader();
}
}
@Override
public DataTableBatchScan newScan() {
return new DataTableBatchScan(
- dataTable.schema().primaryKeys().size() > 0,
+ wrapped.schema().primaryKeys().size() > 0,
coreOptions(),
newSnapshotReader(),
- DefaultValueAssigner.create(dataTable.schema()));
+ DefaultValueAssigner.create(wrapped.schema()));
}
@Override
@@ -111,47 +116,47 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
coreOptions(),
newSnapshotReader(),
snapshotManager(),
- dataTable.supportStreamingReadOverwrite(),
- DefaultValueAssigner.create(dataTable.schema()));
+ wrapped.supportStreamingReadOverwrite(),
+ DefaultValueAssigner.create(wrapped.schema()));
}
@Override
public CoreOptions coreOptions() {
- return dataTable.coreOptions();
+ return wrapped.coreOptions();
}
@Override
public Path location() {
- return dataTable.location();
+ return wrapped.location();
}
@Override
public SnapshotManager snapshotManager() {
- return dataTable.snapshotManager();
+ return wrapped.snapshotManager();
}
@Override
public TagManager tagManager() {
- return dataTable.tagManager();
+ return wrapped.tagManager();
}
@Override
public BranchManager branchManager() {
- return dataTable.branchManager();
+ return wrapped.branchManager();
}
@Override
public InnerTableRead newRead() {
- return dataTable.newRead();
+ return wrapped.newRead();
}
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new ReadOptimizedTable(dataTable.copy(dynamicOptions));
+ return new ReadOptimizedTable(wrapped.copy(dynamicOptions));
}
@Override
public FileIO fileIO() {
- return dataTable.fileIO();
+ return wrapped.fileIO();
}
}