This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new df81844f9 [AMORO-3073]: Paimon Consumer adds the previous Snapshot Id 
Info (#3073)
df81844f9 is described below

commit df81844f9111f8b9e9c7c2bb1784ac3170dc5b47
Author: ConradJam <[email protected]>
AuthorDate: Mon Aug 5 09:57:53 2024 +0800

    [AMORO-3073]: Paimon Consumer adds the previous Snapshot Id Info (#3073)
---
 .../server/dashboard/PaimonTableDescriptor.java    | 14 ++++++++++++--
 .../amoro/server/dashboard/model/ConsumerInfo.java | 22 ++++++++--------------
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
index eb5226e6a..44b9364e7 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
@@ -525,14 +525,24 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
   @Override
   public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
     FileStoreTable table = getTable(amoroTable);
+    FileStore<?> store = table.store();
     ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), 
table.location());
     List<ConsumerInfo> consumerInfos = new ArrayList<>();
     try {
       consumerManager
           .consumers()
           .forEach(
-              (consumerId, nextSnapshot) ->
-                  consumerInfos.add(new ConsumerInfo(consumerId, 
nextSnapshot)));
+              (consumerId, nextSnapshotId) -> {
+                long currentSnapshotId = nextSnapshotId;
+                if 
(!table.snapshotManager().snapshotExists(currentSnapshotId)) {
+                  // if not exits,maybe steaming scan is running,so need to 
nextSnapshotId -1
+                  currentSnapshotId = nextSnapshotId - 1;
+                }
+                Snapshot snapshot = 
table.snapshotManager().snapshot(currentSnapshotId);
+                AmoroSnapshotsOfTable amoroSnapshotsOfTable = 
getSnapshotsOfTable(store, snapshot);
+                consumerInfos.add(
+                    new ConsumerInfo(consumerId, nextSnapshotId, 
amoroSnapshotsOfTable));
+              });
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
index 1e5f6e9c9..4e21a69d6 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
@@ -20,32 +20,26 @@ package org.apache.amoro.server.dashboard.model;
 
 public class ConsumerInfo {
 
-  public static final String CONSUMER_ID = "consumer_id";
-  public static final String NEXT_SNAPSHOT_ID = "next_snapshot_id";
+  private final String consumerId;
+  private final long nextSnapshotId;
+  private final AmoroSnapshotsOfTable amoroCurrentSnapshotsOfTable;
 
-  private String consumerId;
-  private long nextSnapshotId;
-
-  public ConsumerInfo() {}
-
-  public ConsumerInfo(String consumerId, long nextSnapshotId) {
+  public ConsumerInfo(
+      String consumerId, long nextSnapshotId, AmoroSnapshotsOfTable 
amoroCurrentSnapshotsOfTable) {
     this.consumerId = consumerId;
     this.nextSnapshotId = nextSnapshotId;
+    this.amoroCurrentSnapshotsOfTable = amoroCurrentSnapshotsOfTable;
   }
 
   public String getConsumerId() {
     return consumerId;
   }
 
-  public void setConsumerId(String consumerId) {
-    this.consumerId = consumerId;
-  }
-
   public long getNextSnapshotId() {
     return nextSnapshotId;
   }
 
-  public void setNextSnapshotId(long nextSnapshotId) {
-    this.nextSnapshotId = nextSnapshotId;
+  public AmoroSnapshotsOfTable getAmoroCurrentSnapshotsOfTable() {
+    return amoroCurrentSnapshotsOfTable;
   }
 }

Reply via email to