This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cac3980445 [rest] Introduce consumers api definition in rest (#7339)
cac3980445 is described below
commit cac39804458e67a8e0190f2f59c04e58c68132ce
Author: xuzifu666 <[email protected]>
AuthorDate: Thu Mar 5 15:48:22 2026 +0800
[rest] Introduce consumers api definition in rest (#7339)
---
.../org/apache/paimon/consumer/ConsumerInfo.java | 55 +++++++++++++++++
.../main/java/org/apache/paimon/rest/RESTApi.java | 33 +++++++++++
.../java/org/apache/paimon/rest/ResourcePaths.java | 12 ++++
.../rest/responses/ListConsumersResponse.java | 69 ++++++++++++++++++++++
.../java/org/apache/paimon/catalog/Catalog.java | 23 ++++++++
.../java/org/apache/paimon/rest/RESTCatalog.java | 14 +++++
.../org/apache/paimon/rest/RESTCatalogServer.java | 22 +++++++
.../org/apache/paimon/rest/RESTCatalogTest.java | 45 ++++++++++++++
8 files changed, 273 insertions(+)
diff --git
a/paimon-api/src/main/java/org/apache/paimon/consumer/ConsumerInfo.java
b/paimon-api/src/main/java/org/apache/paimon/consumer/ConsumerInfo.java
new file mode 100644
index 0000000000..18dacacd53
--- /dev/null
+++ b/paimon-api/src/main/java/org/apache/paimon/consumer/ConsumerInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.consumer;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Entry representing a consumer with id and nextSnapshot. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ConsumerInfo {
+ private static final String FIELD_CONSUMER_ID = "consumerId";
+ private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";
+
+ @JsonProperty(FIELD_CONSUMER_ID)
+ private final String consumerId;
+
+ @JsonProperty(FIELD_NEXT_SNAPSHOT)
+ private final Long nextSnapshot;
+
+ @JsonCreator
+ public ConsumerInfo(
+ @JsonProperty(FIELD_CONSUMER_ID) String consumerId,
+ @JsonProperty(FIELD_NEXT_SNAPSHOT) Long nextSnapshot) {
+ this.consumerId = consumerId;
+ this.nextSnapshot = nextSnapshot;
+ }
+
+ @JsonGetter(FIELD_CONSUMER_ID)
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ @JsonGetter(FIELD_NEXT_SNAPSHOT)
+ public Long getNextSnapshot() {
+ return nextSnapshot;
+ }
+}
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 8e90eb7542..eefc2e3de0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.consumer.ConsumerInfo;
import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
@@ -64,6 +65,7 @@ import org.apache.paimon.rest.responses.GetTagResponse;
import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListBranchesResponse;
+import org.apache.paimon.rest.responses.ListConsumersResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListFunctionDetailsResponse;
import org.apache.paimon.rest.responses.ListFunctionsGloballyResponse;
@@ -594,6 +596,37 @@ public class RESTApi {
return new PagedList<>(snapshots, response.getNextPageToken());
}
+ /**
+ * Get paged consumers list of the table.
+ *
+ * @param identifier path of the table to list consumers
+ * @param maxResults Optional parameter indicating the maximum number of
results to include in
+ * the result. If maxResults is not specified or set to 0, will return
the default number of
+ * max results.
+ * @param pageToken Optional parameter indicating the next page token
allows list to be start
+ * from a specific point.
+ * @return a list of the consumers with provided page size(@param
maxResults) in this table and
+ * next page token.
+ * @throws NoSuchResourceException Exception thrown on HTTP 404 means the
table not exists
+ * @throws ForbiddenException Exception thrown on HTTP 403 means don't
have the permission for
+ * this table
+ */
+ public PagedList<ConsumerInfo> listConsumersPaged(
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken) {
+ ListConsumersResponse response =
+ client.get(
+ resourcePaths.consumers(
+ identifier.getDatabaseName(),
identifier.getObjectName()),
+ buildPagedQueryParams(maxResults, pageToken),
+ ListConsumersResponse.class,
+ restAuthFunction);
+ List<ConsumerInfo> consumers = response.getConsumers();
+ if (consumers == null) {
+ return new PagedList<>(emptyList(), null);
+ }
+ return new PagedList<>(consumers, response.getNextPageToken());
+ }
+
/**
* Commit snapshot for table.
*
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index aaa8b2e297..2387b36fdf 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -34,6 +34,7 @@ public class ResourcePaths {
protected static final String BRANCHES = "branches";
protected static final String TAGS = "tags";
protected static final String SNAPSHOTS = "snapshots";
+ protected static final String CONSUMERS = "consumers";
protected static final String VIEWS = "views";
protected static final String TABLE_DETAILS = "table-details";
protected static final String VIEW_DETAILS = "view-details";
@@ -261,6 +262,17 @@ public class ResourcePaths {
TAGS);
}
+ public String consumers(String databaseName, String objectName) {
+ return SLASH.join(
+ V1,
+ prefix,
+ DATABASES,
+ encodeString(databaseName),
+ TABLES,
+ encodeString(objectName),
+ CONSUMERS);
+ }
+
public String tag(String databaseName, String objectName, String tagName) {
return SLASH.join(
V1,
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListConsumersResponse.java
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListConsumersResponse.java
new file mode 100644
index 0000000000..fb156a15a6
--- /dev/null
+++
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListConsumersResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.consumer.ConsumerInfo;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Response for list consumers. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListConsumersResponse implements PagedResponse<ConsumerInfo> {
+
+ private static final String FIELD_CONSUMERS = "consumers";
+ private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
+
+ @JsonProperty(FIELD_CONSUMERS)
+ private final List<ConsumerInfo> consumers;
+
+ @JsonProperty(FIELD_NEXT_PAGE_TOKEN)
+ private final String nextPageToken;
+
+ public ListConsumersResponse(@JsonProperty(FIELD_CONSUMERS)
List<ConsumerInfo> consumers) {
+ this(consumers, null);
+ }
+
+ @JsonCreator
+ public ListConsumersResponse(
+ @JsonProperty(FIELD_CONSUMERS) List<ConsumerInfo> consumers,
+ @JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) {
+ this.consumers = consumers;
+ this.nextPageToken = nextPageToken;
+ }
+
+ @JsonGetter(FIELD_CONSUMERS)
+ public List<ConsumerInfo> getConsumers() {
+ return this.consumers;
+ }
+
+ @Override
+ public List<ConsumerInfo> data() {
+ return this.consumers;
+ }
+
+ @JsonGetter(FIELD_NEXT_PAGE_TOKEN)
+ public String getNextPageToken() {
+ return this.nextPageToken;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 79aace221f..5dfbaf2978 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -22,6 +22,7 @@ import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.consumer.ConsumerInfo;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.partition.Partition;
@@ -742,6 +743,28 @@ public interface Catalog extends AutoCloseable {
Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken)
throws TableNotExistException;
+ /**
+ * Get paged consumers list of the table.
+ *
+ * @param identifier path of the table to list consumers
+ * @param maxResults Optional parameter indicating the maximum number of
results to include in
+ * the result. If maxResults is not specified or set to 0, will return
the default number of
+ * max results.
+ * @param pageToken Optional parameter indicating the next page token
allows list to be start
+ * from a specific point.
+ * @return a list of the consumers with provided page size(@param
maxResults) in this table and
+ * next page token. Each consumer is represented as a Map.Entry where
the key is the
+ * consumer id and the value is the next snapshot id.
+ * @throws TableNotExistException if the table does not exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
+ */
+ default PagedList<ConsumerInfo> listConsumersPaged(
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken)
+ throws TableNotExistException {
+ throw new UnsupportedOperationException("This catalog does not support
list consumers");
+ }
+
/**
* rollback table by the given {@link Identifier} and instant.
*
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 6c6344bfc7..4accdcb02d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -31,6 +31,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.catalog.TableQueryAuthResult;
+import org.apache.paimon.consumer.ConsumerInfo;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.ResolvingFileIO;
@@ -358,6 +359,19 @@ public class RESTCatalog implements Catalog {
}
}
+ @Override
+ public PagedList<ConsumerInfo> listConsumersPaged(
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken)
+ throws TableNotExistException {
+ try {
+ return api.listConsumersPaged(identifier, maxResults, pageToken);
+ } catch (NoSuchResourceException e) {
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+ }
+
@Override
public boolean supportsListObjectsPaged() {
return true;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 5d8c5ab3cb..1e0a5125be 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -28,6 +28,8 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.TableMetadata;
+import org.apache.paimon.consumer.ConsumerInfo;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -75,6 +77,7 @@ import org.apache.paimon.rest.responses.GetTagResponse;
import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListBranchesResponse;
+import org.apache.paimon.rest.responses.ListConsumersResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListFunctionDetailsResponse;
import org.apache.paimon.rest.responses.ListFunctionsGloballyResponse;
@@ -388,6 +391,10 @@ public class RESTCatalogServer {
resources.length == 4
&&
ResourcePaths.TABLES.equals(resources[1])
&&
ResourcePaths.SNAPSHOTS.equals(resources[3]);
+ boolean isListConsumers =
+ resources.length == 4
+ &&
ResourcePaths.TABLES.equals(resources[1])
+ &&
ResourcePaths.CONSUMERS.equals(resources[3]);
boolean isLoadSnapshot =
resources.length == 5
&&
ResourcePaths.TABLES.equals(resources[1])
@@ -494,6 +501,8 @@ public class RESTCatalogServer {
return snapshotHandle(identifier);
} else if (isListSnapshots) {
return listSnapshots(identifier);
+ } else if (isListConsumers) {
+ return listConsumers(identifier);
} else if (isLoadSnapshot) {
return loadSnapshot(identifier, resources[4]);
} else if (isTableAuth) {
@@ -785,6 +794,19 @@ public class RESTCatalogServer {
return new
MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
}
+ private MockResponse listConsumers(Identifier identifier) throws Exception
{
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ ConsumerManager consumerManager =
+ new ConsumerManager(table.fileIO(), table.location(), "main");
+ Map<String, Long> consumers = consumerManager.consumers();
+ List<ConsumerInfo> consumerEntries =
+ consumers.entrySet().stream()
+ .map(e -> new ConsumerInfo(e.getKey(), e.getValue()))
+ .collect(Collectors.toList());
+ ListConsumersResponse response = new
ListConsumersResponse(consumerEntries, null);
+ return new
MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
+ }
+
private MockResponse loadSnapshot(Identifier identifier, String version)
throws Exception {
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 0c17d9029c..8eb3c56aea 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -28,6 +28,8 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.consumer.ConsumerInfo;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -2585,6 +2587,49 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
table.snapshot(14));
}
+ @Test
+ void testListConsumers() throws Exception {
+ Identifier identifier = Identifier.create("test_table_db",
"consumers_table");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ catalog.createTable(
+ identifier,
+ new Schema(
+ Lists.newArrayList(new DataField(0, "col",
DataTypes.INT())),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ emptyMap(),
+ ""),
+ true);
+ FileStoreTable fileStoreTable = (FileStoreTable)
catalog.getTable(identifier);
+
+ // Create some snapshots
+ batchWrite(fileStoreTable, singletonList(1));
+ batchWrite(fileStoreTable, singletonList(1));
+ batchWrite(fileStoreTable, singletonList(1));
+
+ // Create consumers
+ ConsumerManager consumerManager =
+ new ConsumerManager(fileStoreTable.fileIO(),
fileStoreTable.location());
+ consumerManager.resetConsumer("consumer1", new
org.apache.paimon.consumer.Consumer(1));
+ consumerManager.resetConsumer("consumer2", new
org.apache.paimon.consumer.Consumer(2));
+
+ // Test listConsumersPaged
+ assertThat(catalog.listConsumersPaged(identifier, null,
null).getElements().size())
+ .isEqualTo(2);
+
+ // Test with RESTApi directly
+ RESTApi api = ((RESTCatalog) catalog).api();
+ List<ConsumerInfo> consumers =
+ PagedList.listAllFromPagedApi(
+ token -> api.listConsumersPaged(identifier, null,
token));
+ assertThat(consumers)
+ .extracting(ConsumerInfo::getConsumerId)
+ .containsExactlyInAnyOrder("consumer1", "consumer2");
+ assertThat(consumers)
+ .extracting(ConsumerInfo::getNextSnapshot)
+ .containsExactlyInAnyOrder(1L, 2L);
+ }
+
@Test
public void testObjectTable() throws Exception {
// create object table