This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new df81844f9 [AMORO-3073]: Paimon Consumer adds the previous Snapshot Id
Info (#3073)
df81844f9 is described below
commit df81844f9111f8b9e9c7c2bb1784ac3170dc5b47
Author: ConradJam <[email protected]>
AuthorDate: Mon Aug 5 09:57:53 2024 +0800
[AMORO-3073]: Paimon Consumer adds the previous Snapshot Id Info (#3073)
---
.../server/dashboard/PaimonTableDescriptor.java | 14 ++++++++++++--
.../amoro/server/dashboard/model/ConsumerInfo.java | 22 ++++++++--------------
2 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
index eb5226e6a..44b9364e7 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
@@ -525,14 +525,24 @@ public class PaimonTableDescriptor implements
FormatTableDescriptor {
@Override
public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
FileStoreTable table = getTable(amoroTable);
+ FileStore<?> store = table.store();
ConsumerManager consumerManager = new ConsumerManager(table.fileIO(),
table.location());
List<ConsumerInfo> consumerInfos = new ArrayList<>();
try {
consumerManager
.consumers()
.forEach(
- (consumerId, nextSnapshot) ->
- consumerInfos.add(new ConsumerInfo(consumerId,
nextSnapshot)));
+ (consumerId, nextSnapshotId) -> {
+ long currentSnapshotId = nextSnapshotId;
+ if
(!table.snapshotManager().snapshotExists(currentSnapshotId)) {
+ // if not exits,maybe steaming scan is running,so need to
nextSnapshotId -1
+ currentSnapshotId = nextSnapshotId - 1;
+ }
+ Snapshot snapshot =
table.snapshotManager().snapshot(currentSnapshotId);
+ AmoroSnapshotsOfTable amoroSnapshotsOfTable =
getSnapshotsOfTable(store, snapshot);
+ consumerInfos.add(
+ new ConsumerInfo(consumerId, nextSnapshotId,
amoroSnapshotsOfTable));
+ });
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
index 1e5f6e9c9..4e21a69d6 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
@@ -20,32 +20,26 @@ package org.apache.amoro.server.dashboard.model;
public class ConsumerInfo {
- public static final String CONSUMER_ID = "consumer_id";
- public static final String NEXT_SNAPSHOT_ID = "next_snapshot_id";
+ private final String consumerId;
+ private final long nextSnapshotId;
+ private final AmoroSnapshotsOfTable amoroCurrentSnapshotsOfTable;
- private String consumerId;
- private long nextSnapshotId;
-
- public ConsumerInfo() {}
-
- public ConsumerInfo(String consumerId, long nextSnapshotId) {
+ public ConsumerInfo(
+ String consumerId, long nextSnapshotId, AmoroSnapshotsOfTable
amoroCurrentSnapshotsOfTable) {
this.consumerId = consumerId;
this.nextSnapshotId = nextSnapshotId;
+ this.amoroCurrentSnapshotsOfTable = amoroCurrentSnapshotsOfTable;
}
public String getConsumerId() {
return consumerId;
}
- public void setConsumerId(String consumerId) {
- this.consumerId = consumerId;
- }
-
public long getNextSnapshotId() {
return nextSnapshotId;
}
- public void setNextSnapshotId(long nextSnapshotId) {
- this.nextSnapshotId = nextSnapshotId;
+ public AmoroSnapshotsOfTable getAmoroCurrentSnapshotsOfTable() {
+ return amoroCurrentSnapshotsOfTable;
}
}