This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 493628f26 [AMORO-3027]: Support Paimon Consumer Info in Ams (#3028)
493628f26 is described below

commit 493628f26ced2b8d4423e08a0f6c96a4d1f4c905
Author: ConradJam <[email protected]>
AuthorDate: Sat Jul 20 20:16:29 2024 +0800

    [AMORO-3027]: Support Paimon Consumer Info in Ams (#3028)
    
    [AMORO-3027]: Support Paimon Consumer Info in Ams (#3027)
    
    Co-authored-by: ConradJam <[email protected]>
---
 .../amoro/server/dashboard/DashboardServer.java    |  3 ++
 .../server/dashboard/FormatTableDescriptor.java    |  4 ++
 .../dashboard/MixedAndIcebergTableDescriptor.java  |  6 +++
 .../server/dashboard/PaimonTableDescriptor.java    | 19 ++++++++
 .../server/dashboard/ServerTableDescriptor.java    |  7 +++
 .../dashboard/controller/TableController.java      | 15 +++++++
 .../amoro/server/dashboard/model/ConsumerInfo.java | 51 ++++++++++++++++++++++
 7 files changed, 105 insertions(+)

diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
index 6d8c98407..cb9c2a2cb 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
@@ -240,6 +240,9 @@ public class DashboardServer {
             get(
                 "/catalogs/{catalog}/dbs/{db}/tables/{table}/branches",
                 tableController::getTableBranches);
+            get(
+                "/catalogs/{catalog}/dbs/{db}/tables/{table}/consumers",
+                tableController::getTableConsumerInfos);
             post(
                 
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes/{processId}/cancel",
                 tableController::cancelOptimizingProcess);
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
index 2084529a2..d844b32b1 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.amoro.server.dashboard;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
 import org.apache.amoro.server.dashboard.model.DDLInfo;
 import org.apache.amoro.server.dashboard.model.OperationType;
 import org.apache.amoro.server.dashboard.model.OptimizingProcessInfo;
@@ -71,4 +72,7 @@ public interface FormatTableDescriptor {
 
   /** Get the branch information of the {@link AmoroTable}. */
   List<TagOrBranchInfo> getTableBranches(AmoroTable<?> amoroTable);
+
+  /** Get the consumer information of the {@link AmoroTable}. */
+  List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable);
 }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index 4434304f0..21ca369c9 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -28,6 +28,7 @@ import 
org.apache.amoro.server.dashboard.component.reverser.IcebergTableMetaExtr
 import org.apache.amoro.server.dashboard.model.AMSColumnInfo;
 import org.apache.amoro.server.dashboard.model.AMSPartitionField;
 import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
 import org.apache.amoro.server.dashboard.model.DDLInfo;
 import org.apache.amoro.server.dashboard.model.FilesStatistics;
 import org.apache.amoro.server.dashboard.model.OperationType;
@@ -484,6 +485,11 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
     return getTableTagsOrBranches(amoroTable, SnapshotRef::isBranch);
   }
 
+  @Override
+  public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
+    return Collections.emptyList();
+  }
+
   @Override
   public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
       AmoroTable<?> amoroTable, int limit, int offset) {
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
index ad878b268..eb5226e6a 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
@@ -30,6 +30,7 @@ import 
org.apache.amoro.server.dashboard.component.reverser.PaimonTableMetaExtra
 import org.apache.amoro.server.dashboard.model.AMSColumnInfo;
 import org.apache.amoro.server.dashboard.model.AMSPartitionField;
 import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
 import org.apache.amoro.server.dashboard.model.DDLInfo;
 import org.apache.amoro.server.dashboard.model.OperationType;
 import org.apache.amoro.server.dashboard.model.OptimizingProcessInfo;
@@ -50,6 +51,7 @@ import org.apache.iceberg.util.Pair;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
@@ -520,6 +522,23 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
     return ImmutableList.of(TagOrBranchInfo.MAIN_BRANCH);
   }
 
+  @Override
+  public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
+    FileStoreTable table = getTable(amoroTable);
+    ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), 
table.location());
+    List<ConsumerInfo> consumerInfos = new ArrayList<>();
+    try {
+      consumerManager
+          .consumers()
+          .forEach(
+              (consumerId, nextSnapshot) ->
+                  consumerInfos.add(new ConsumerInfo(consumerId, 
nextSnapshot)));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return consumerInfos;
+  }
+
   private AmoroSnapshotsOfTable manifestListInfo(
       FileStore<?> store,
       Snapshot snapshot,
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
index a74e0186a..0df27a730 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.amoro.api.TableIdentifier;
 import org.apache.amoro.api.config.Configurations;
 import org.apache.amoro.server.catalog.ServerCatalog;
 import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
 import org.apache.amoro.server.dashboard.model.DDLInfo;
 import org.apache.amoro.server.dashboard.model.OperationType;
 import org.apache.amoro.server.dashboard.model.OptimizingProcessInfo;
@@ -117,6 +118,12 @@ public class ServerTableDescriptor extends PersistentBase {
     return formatTableDescriptor.getTableBranches(amoroTable);
   }
 
+  public List<ConsumerInfo> getTableConsumersInfos(TableIdentifier 
tableIdentifier) {
+    AmoroTable<?> amoroTable = loadTable(tableIdentifier);
+    FormatTableDescriptor formatTableDescriptor = 
formatDescriptorMap.get(amoroTable.format());
+    return formatTableDescriptor.getTableConsumerInfos(amoroTable);
+  }
+
   public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
       TableIdentifier tableIdentifier, int limit, int offset) {
     AmoroTable<?> amoroTable = loadTable(tableIdentifier);
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
index ae23a32b7..8d6884f47 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
@@ -39,6 +39,7 @@ import 
org.apache.amoro.server.dashboard.ServerTableDescriptor;
 import org.apache.amoro.server.dashboard.ServerTableProperties;
 import org.apache.amoro.server.dashboard.model.AMSColumnInfo;
 import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
 import org.apache.amoro.server.dashboard.model.DDLInfo;
 import org.apache.amoro.server.dashboard.model.HiveTableInfo;
 import org.apache.amoro.server.dashboard.model.OperationType;
@@ -614,6 +615,20 @@ public class TableController {
     ctx.json(OkResponse.of(amsPageResult));
   }
 
+  public void getTableConsumerInfos(Context ctx) {
+    String catalog = ctx.pathParam("catalog");
+    String database = ctx.pathParam("db");
+    String table = ctx.pathParam("table");
+    Integer page = ctx.queryParamAsClass("page", 
Integer.class).getOrDefault(1);
+    Integer pageSize = ctx.queryParamAsClass("pageSize", 
Integer.class).getOrDefault(20);
+    List<ConsumerInfo> consumerInfos =
+        tableDescriptor.getTableConsumersInfos(
+            TableIdentifier.of(catalog, database, 
table).buildTableIdentifier());
+    int offset = (page - 1) * pageSize;
+    PageResult<ConsumerInfo> amsPageResult = PageResult.of(consumerInfos, 
offset, pageSize);
+    ctx.json(OkResponse.of(amsPageResult));
+  }
+
   /**
    * cancel the running optimizing process of one certain table.
    *
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
new file mode 100644
index 000000000..1e5f6e9c9
--- /dev/null
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.dashboard.model;
+
+public class ConsumerInfo {
+
+  public static final String CONSUMER_ID = "consumer_id";
+  public static final String NEXT_SNAPSHOT_ID = "next_snapshot_id";
+
+  private String consumerId;
+  private long nextSnapshotId;
+
+  public ConsumerInfo() {}
+
+  public ConsumerInfo(String consumerId, long nextSnapshotId) {
+    this.consumerId = consumerId;
+    this.nextSnapshotId = nextSnapshotId;
+  }
+
+  public String getConsumerId() {
+    return consumerId;
+  }
+
+  public void setConsumerId(String consumerId) {
+    this.consumerId = consumerId;
+  }
+
+  public long getNextSnapshotId() {
+    return nextSnapshotId;
+  }
+
+  public void setNextSnapshotId(long nextSnapshotId) {
+    this.nextSnapshotId = nextSnapshotId;
+  }
+}

Reply via email to