This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cac3980445 [rest] Introduce consumers api definition in rest (#7339)
cac3980445 is described below

commit cac39804458e67a8e0190f2f59c04e58c68132ce
Author: xuzifu666 <[email protected]>
AuthorDate: Thu Mar 5 15:48:22 2026 +0800

    [rest] Introduce consumers api definition in rest (#7339)
---
 .../org/apache/paimon/consumer/ConsumerInfo.java   | 55 +++++++++++++++++
 .../main/java/org/apache/paimon/rest/RESTApi.java  | 33 +++++++++++
 .../java/org/apache/paimon/rest/ResourcePaths.java | 12 ++++
 .../rest/responses/ListConsumersResponse.java      | 69 ++++++++++++++++++++++
 .../java/org/apache/paimon/catalog/Catalog.java    | 23 ++++++++
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 14 +++++
 .../org/apache/paimon/rest/RESTCatalogServer.java  | 22 +++++++
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 45 ++++++++++++++
 8 files changed, 273 insertions(+)

diff --git 
a/paimon-api/src/main/java/org/apache/paimon/consumer/ConsumerInfo.java 
b/paimon-api/src/main/java/org/apache/paimon/consumer/ConsumerInfo.java
new file mode 100644
index 0000000000..18dacacd53
--- /dev/null
+++ b/paimon-api/src/main/java/org/apache/paimon/consumer/ConsumerInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.consumer;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Entry representing a consumer with id and nextSnapshot. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ConsumerInfo {
+    private static final String FIELD_CONSUMER_ID = "consumerId";
+    private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";
+
+    @JsonProperty(FIELD_CONSUMER_ID)
+    private final String consumerId;
+
+    @JsonProperty(FIELD_NEXT_SNAPSHOT)
+    private final Long nextSnapshot;
+
+    @JsonCreator
+    public ConsumerInfo(
+            @JsonProperty(FIELD_CONSUMER_ID) String consumerId,
+            @JsonProperty(FIELD_NEXT_SNAPSHOT) Long nextSnapshot) {
+        this.consumerId = consumerId;
+        this.nextSnapshot = nextSnapshot;
+    }
+
+    @JsonGetter(FIELD_CONSUMER_ID)
+    public String getConsumerId() {
+        return consumerId;
+    }
+
+    @JsonGetter(FIELD_NEXT_SNAPSHOT)
+    public Long getNextSnapshot() {
+        return nextSnapshot;
+    }
+}
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 8e90eb7542..eefc2e3de0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.consumer.ConsumerInfo;
 import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
@@ -64,6 +65,7 @@ import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListBranchesResponse;
+import org.apache.paimon.rest.responses.ListConsumersResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListFunctionDetailsResponse;
 import org.apache.paimon.rest.responses.ListFunctionsGloballyResponse;
@@ -594,6 +596,37 @@ public class RESTApi {
         return new PagedList<>(snapshots, response.getNextPageToken());
     }
 
+    /**
+     * Get paged consumers list of the table.
+     *
+     * @param identifier path of the table to list consumers
+     * @param maxResults Optional parameter indicating the maximum number of 
results to include in
+     *     the result. If maxResults is not specified or set to 0, will return 
the default number of
+     *     max results.
+     * @param pageToken Optional parameter indicating the next page token 
allows list to be start
+     *     from a specific point.
+     * @return a list of the consumers with provided page size(@param 
maxResults) in this table and
+     *     next page token.
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
table not exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public PagedList<ConsumerInfo> listConsumersPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken) {
+        ListConsumersResponse response =
+                client.get(
+                        resourcePaths.consumers(
+                                identifier.getDatabaseName(), 
identifier.getObjectName()),
+                        buildPagedQueryParams(maxResults, pageToken),
+                        ListConsumersResponse.class,
+                        restAuthFunction);
+        List<ConsumerInfo> consumers = response.getConsumers();
+        if (consumers == null) {
+            return new PagedList<>(emptyList(), null);
+        }
+        return new PagedList<>(consumers, response.getNextPageToken());
+    }
+
     /**
      * Commit snapshot for table.
      *
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index aaa8b2e297..2387b36fdf 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -34,6 +34,7 @@ public class ResourcePaths {
     protected static final String BRANCHES = "branches";
     protected static final String TAGS = "tags";
     protected static final String SNAPSHOTS = "snapshots";
+    protected static final String CONSUMERS = "consumers";
     protected static final String VIEWS = "views";
     protected static final String TABLE_DETAILS = "table-details";
     protected static final String VIEW_DETAILS = "view-details";
@@ -261,6 +262,17 @@ public class ResourcePaths {
                 TAGS);
     }
 
+    public String consumers(String databaseName, String objectName) {
+        return SLASH.join(
+                V1,
+                prefix,
+                DATABASES,
+                encodeString(databaseName),
+                TABLES,
+                encodeString(objectName),
+                CONSUMERS);
+    }
+
     public String tag(String databaseName, String objectName, String tagName) {
         return SLASH.join(
                 V1,
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListConsumersResponse.java
 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListConsumersResponse.java
new file mode 100644
index 0000000000..fb156a15a6
--- /dev/null
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListConsumersResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.consumer.ConsumerInfo;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Response for list consumers. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListConsumersResponse implements PagedResponse<ConsumerInfo> {
+
+    private static final String FIELD_CONSUMERS = "consumers";
+    private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
+
+    @JsonProperty(FIELD_CONSUMERS)
+    private final List<ConsumerInfo> consumers;
+
+    @JsonProperty(FIELD_NEXT_PAGE_TOKEN)
+    private final String nextPageToken;
+
+    public ListConsumersResponse(@JsonProperty(FIELD_CONSUMERS) 
List<ConsumerInfo> consumers) {
+        this(consumers, null);
+    }
+
+    @JsonCreator
+    public ListConsumersResponse(
+            @JsonProperty(FIELD_CONSUMERS) List<ConsumerInfo> consumers,
+            @JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) {
+        this.consumers = consumers;
+        this.nextPageToken = nextPageToken;
+    }
+
+    @JsonGetter(FIELD_CONSUMERS)
+    public List<ConsumerInfo> getConsumers() {
+        return this.consumers;
+    }
+
+    @Override
+    public List<ConsumerInfo> data() {
+        return this.consumers;
+    }
+
+    @JsonGetter(FIELD_NEXT_PAGE_TOKEN)
+    public String getNextPageToken() {
+        return this.nextPageToken;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 79aace221f..5dfbaf2978 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -22,6 +22,7 @@ import org.apache.paimon.PagedList;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.consumer.ConsumerInfo;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.partition.Partition;
@@ -742,6 +743,28 @@ public interface Catalog extends AutoCloseable {
             Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
             throws TableNotExistException;
 
+    /**
+     * Get paged consumers list of the table.
+     *
+     * @param identifier path of the table to list consumers
+     * @param maxResults Optional parameter indicating the maximum number of 
results to include in
+     *     the result. If maxResults is not specified or set to 0, will return 
the default number of
+     *     max results.
+     * @param pageToken Optional parameter indicating the next page token 
allows list to be start
+     *     from a specific point.
+     * @return a list of the consumers with provided page size(@param 
maxResults) in this table and
+     *     next page token. Each consumer is represented as a Map.Entry where 
the key is the
+     *     consumer id and the value is the next snapshot id.
+     * @throws TableNotExistException if the table does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    default PagedList<ConsumerInfo> listConsumersPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException {
+        throw new UnsupportedOperationException("This catalog does not support 
list consumers");
+    }
+
     /**
      * rollback table by the given {@link Identifier} and instant.
      *
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 6c6344bfc7..4accdcb02d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -31,6 +31,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.catalog.TableQueryAuthResult;
+import org.apache.paimon.consumer.ConsumerInfo;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.ResolvingFileIO;
@@ -358,6 +359,19 @@ public class RESTCatalog implements Catalog {
         }
     }
 
+    @Override
+    public PagedList<ConsumerInfo> listConsumersPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException {
+        try {
+            return api.listConsumersPaged(identifier, maxResults, pageToken);
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
     @Override
     public boolean supportsListObjectsPaged() {
         return true;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 5d8c5ab3cb..1e0a5125be 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -28,6 +28,8 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
 import org.apache.paimon.catalog.TableMetadata;
+import org.apache.paimon.consumer.ConsumerInfo;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -75,6 +77,7 @@ import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListBranchesResponse;
+import org.apache.paimon.rest.responses.ListConsumersResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListFunctionDetailsResponse;
 import org.apache.paimon.rest.responses.ListFunctionsGloballyResponse;
@@ -388,6 +391,10 @@ public class RESTCatalogServer {
                                 resources.length == 4
                                         && 
ResourcePaths.TABLES.equals(resources[1])
                                         && 
ResourcePaths.SNAPSHOTS.equals(resources[3]);
+                        boolean isListConsumers =
+                                resources.length == 4
+                                        && 
ResourcePaths.TABLES.equals(resources[1])
+                                        && 
ResourcePaths.CONSUMERS.equals(resources[3]);
                         boolean isLoadSnapshot =
                                 resources.length == 5
                                         && 
ResourcePaths.TABLES.equals(resources[1])
@@ -494,6 +501,8 @@ public class RESTCatalogServer {
                             return snapshotHandle(identifier);
                         } else if (isListSnapshots) {
                             return listSnapshots(identifier);
+                        } else if (isListConsumers) {
+                            return listConsumers(identifier);
                         } else if (isLoadSnapshot) {
                             return loadSnapshot(identifier, resources[4]);
                         } else if (isTableAuth) {
@@ -785,6 +794,19 @@ public class RESTCatalogServer {
         return new 
MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
     }
 
+    private MockResponse listConsumers(Identifier identifier) throws Exception 
{
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        ConsumerManager consumerManager =
+                new ConsumerManager(table.fileIO(), table.location(), "main");
+        Map<String, Long> consumers = consumerManager.consumers();
+        List<ConsumerInfo> consumerEntries =
+                consumers.entrySet().stream()
+                        .map(e -> new ConsumerInfo(e.getKey(), e.getValue()))
+                        .collect(Collectors.toList());
+        ListConsumersResponse response = new 
ListConsumersResponse(consumerEntries, null);
+        return new 
MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
+    }
+
     private MockResponse loadSnapshot(Identifier identifier, String version) 
throws Exception {
 
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 0c17d9029c..8eb3c56aea 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -28,6 +28,8 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.consumer.ConsumerInfo;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -2585,6 +2587,49 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
                         table.snapshot(14));
     }
 
+    @Test
+    void testListConsumers() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"consumers_table");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(new DataField(0, "col", 
DataTypes.INT())),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        emptyMap(),
+                        ""),
+                true);
+        FileStoreTable fileStoreTable = (FileStoreTable) 
catalog.getTable(identifier);
+
+        // Create some snapshots
+        batchWrite(fileStoreTable, singletonList(1));
+        batchWrite(fileStoreTable, singletonList(1));
+        batchWrite(fileStoreTable, singletonList(1));
+
+        // Create consumers
+        ConsumerManager consumerManager =
+                new ConsumerManager(fileStoreTable.fileIO(), 
fileStoreTable.location());
+        consumerManager.resetConsumer("consumer1", new 
org.apache.paimon.consumer.Consumer(1));
+        consumerManager.resetConsumer("consumer2", new 
org.apache.paimon.consumer.Consumer(2));
+
+        // Test listConsumersPaged
+        assertThat(catalog.listConsumersPaged(identifier, null, 
null).getElements().size())
+                .isEqualTo(2);
+
+        // Test with RESTApi directly
+        RESTApi api = ((RESTCatalog) catalog).api();
+        List<ConsumerInfo> consumers =
+                PagedList.listAllFromPagedApi(
+                        token -> api.listConsumersPaged(identifier, null, 
token));
+        assertThat(consumers)
+                .extracting(ConsumerInfo::getConsumerId)
+                .containsExactlyInAnyOrder("consumer1", "consumer2");
+        assertThat(consumers)
+                .extracting(ConsumerInfo::getNextSnapshot)
+                .containsExactlyInAnyOrder(1L, 2L);
+    }
+
     @Test
     public void testObjectTable() throws Exception {
         // create object table

Reply via email to