This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 493628f26 [AMORO-3027]: Support Paimon Consumer Info in Ams (#3028)
493628f26 is described below
commit 493628f26ced2b8d4423e08a0f6c96a4d1f4c905
Author: ConradJam <[email protected]>
AuthorDate: Sat Jul 20 20:16:29 2024 +0800
[AMORO-3027]: Support Paimon Consumer Info in Ams (#3028)
[AMORO-3027]: Support Paimon Consumer Info in Ams (#3027)
Co-authored-by: ConradJam <[email protected]>
---
.../amoro/server/dashboard/DashboardServer.java | 3 ++
.../server/dashboard/FormatTableDescriptor.java | 4 ++
.../dashboard/MixedAndIcebergTableDescriptor.java | 6 +++
.../server/dashboard/PaimonTableDescriptor.java | 19 ++++++++
.../server/dashboard/ServerTableDescriptor.java | 7 +++
.../dashboard/controller/TableController.java | 15 +++++++
.../amoro/server/dashboard/model/ConsumerInfo.java | 51 ++++++++++++++++++++++
7 files changed, 105 insertions(+)
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
index 6d8c98407..cb9c2a2cb 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
@@ -240,6 +240,9 @@ public class DashboardServer {
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/branches",
tableController::getTableBranches);
+ get(
+ "/catalogs/{catalog}/dbs/{db}/tables/{table}/consumers",
+ tableController::getTableConsumerInfos);
post(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes/{processId}/cancel",
tableController::cancelOptimizingProcess);
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
index 2084529a2..d844b32b1 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.amoro.server.dashboard;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
import org.apache.amoro.server.dashboard.model.DDLInfo;
import org.apache.amoro.server.dashboard.model.OperationType;
import org.apache.amoro.server.dashboard.model.OptimizingProcessInfo;
@@ -71,4 +72,7 @@ public interface FormatTableDescriptor {
/** Get the branch information of the {@link AmoroTable}. */
List<TagOrBranchInfo> getTableBranches(AmoroTable<?> amoroTable);
+
+ /** Get the consumer information of the {@link AmoroTable}. */
+ List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable);
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index 4434304f0..21ca369c9 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -28,6 +28,7 @@ import
org.apache.amoro.server.dashboard.component.reverser.IcebergTableMetaExtr
import org.apache.amoro.server.dashboard.model.AMSColumnInfo;
import org.apache.amoro.server.dashboard.model.AMSPartitionField;
import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
import org.apache.amoro.server.dashboard.model.DDLInfo;
import org.apache.amoro.server.dashboard.model.FilesStatistics;
import org.apache.amoro.server.dashboard.model.OperationType;
@@ -484,6 +485,11 @@ public class MixedAndIcebergTableDescriptor extends
PersistentBase
return getTableTagsOrBranches(amoroTable, SnapshotRef::isBranch);
}
+ @Override
+ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
+ return Collections.emptyList();
+ }
+
@Override
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, int limit, int offset) {
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
index ad878b268..eb5226e6a 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
@@ -30,6 +30,7 @@ import
org.apache.amoro.server.dashboard.component.reverser.PaimonTableMetaExtra
import org.apache.amoro.server.dashboard.model.AMSColumnInfo;
import org.apache.amoro.server.dashboard.model.AMSPartitionField;
import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
import org.apache.amoro.server.dashboard.model.DDLInfo;
import org.apache.amoro.server.dashboard.model.OperationType;
import org.apache.amoro.server.dashboard.model.OptimizingProcessInfo;
@@ -50,6 +51,7 @@ import org.apache.iceberg.util.Pair;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
@@ -520,6 +522,23 @@ public class PaimonTableDescriptor implements
FormatTableDescriptor {
return ImmutableList.of(TagOrBranchInfo.MAIN_BRANCH);
}
+ @Override
+ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
+ FileStoreTable table = getTable(amoroTable);
+ ConsumerManager consumerManager = new ConsumerManager(table.fileIO(),
table.location());
+ List<ConsumerInfo> consumerInfos = new ArrayList<>();
+ try {
+ consumerManager
+ .consumers()
+ .forEach(
+ (consumerId, nextSnapshot) ->
+ consumerInfos.add(new ConsumerInfo(consumerId,
nextSnapshot)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return consumerInfos;
+ }
+
private AmoroSnapshotsOfTable manifestListInfo(
FileStore<?> store,
Snapshot snapshot,
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
index a74e0186a..0df27a730 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.api.config.Configurations;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
import org.apache.amoro.server.dashboard.model.DDLInfo;
import org.apache.amoro.server.dashboard.model.OperationType;
import org.apache.amoro.server.dashboard.model.OptimizingProcessInfo;
@@ -117,6 +118,12 @@ public class ServerTableDescriptor extends PersistentBase {
return formatTableDescriptor.getTableBranches(amoroTable);
}
+ public List<ConsumerInfo> getTableConsumersInfos(TableIdentifier
tableIdentifier) {
+ AmoroTable<?> amoroTable = loadTable(tableIdentifier);
+ FormatTableDescriptor formatTableDescriptor =
formatDescriptorMap.get(amoroTable.format());
+ return formatTableDescriptor.getTableConsumerInfos(amoroTable);
+ }
+
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
TableIdentifier tableIdentifier, int limit, int offset) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
index ae23a32b7..8d6884f47 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
@@ -39,6 +39,7 @@ import
org.apache.amoro.server.dashboard.ServerTableDescriptor;
import org.apache.amoro.server.dashboard.ServerTableProperties;
import org.apache.amoro.server.dashboard.model.AMSColumnInfo;
import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
import org.apache.amoro.server.dashboard.model.DDLInfo;
import org.apache.amoro.server.dashboard.model.HiveTableInfo;
import org.apache.amoro.server.dashboard.model.OperationType;
@@ -614,6 +615,20 @@ public class TableController {
ctx.json(OkResponse.of(amsPageResult));
}
+ public void getTableConsumerInfos(Context ctx) {
+ String catalog = ctx.pathParam("catalog");
+ String database = ctx.pathParam("db");
+ String table = ctx.pathParam("table");
+ Integer page = ctx.queryParamAsClass("page",
Integer.class).getOrDefault(1);
+ Integer pageSize = ctx.queryParamAsClass("pageSize",
Integer.class).getOrDefault(20);
+ List<ConsumerInfo> consumerInfos =
+ tableDescriptor.getTableConsumersInfos(
+ TableIdentifier.of(catalog, database,
table).buildTableIdentifier());
+ int offset = (page - 1) * pageSize;
+ PageResult<ConsumerInfo> amsPageResult = PageResult.of(consumerInfos,
offset, pageSize);
+ ctx.json(OkResponse.of(amsPageResult));
+ }
+
/**
* cancel the running optimizing process of one certain table.
*
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
new file mode 100644
index 000000000..1e5f6e9c9
--- /dev/null
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.dashboard.model;
+
+public class ConsumerInfo {
+
+ public static final String CONSUMER_ID = "consumer_id";
+ public static final String NEXT_SNAPSHOT_ID = "next_snapshot_id";
+
+ private String consumerId;
+ private long nextSnapshotId;
+
+ public ConsumerInfo() {}
+
+ public ConsumerInfo(String consumerId, long nextSnapshotId) {
+ this.consumerId = consumerId;
+ this.nextSnapshotId = nextSnapshotId;
+ }
+
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ public long getNextSnapshotId() {
+ return nextSnapshotId;
+ }
+
+ public void setNextSnapshotId(long nextSnapshotId) {
+ this.nextSnapshotId = nextSnapshotId;
+ }
+}