This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 22f7323c64c [feat][admin] PIP-330: getMessagesById gets all messages 
(#21918)
22f7323c64c is described below

commit 22f7323c64caadf92003f8c916308f12d81da486
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Jan 22 19:57:18 2024 +0800

    [feat][admin] PIP-330: getMessagesById gets all messages (#21918)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../pulsar/broker/admin/AdminTopicApiTest.java     | 64 ++++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 25 +++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 23 +++++---
 3 files changed, 105 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
index 93bf2349103..a1ed4271616 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
@@ -19,21 +19,33 @@
 package org.apache.pulsar.broker.admin;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -140,4 +152,56 @@ public class AdminTopicApiTest extends 
ProducerConsumerBase {
         }
         assertTrue(stats.getSubscriptions().containsKey(subscriptionName));
     }
+
+    @Test
+    public void testGetMessagesId() throws PulsarClientException, 
ExecutionException, InterruptedException {
+        String topic = newTopicName();
+
+        int numMessages = 10;
+        int batchingMaxMessages = numMessages / 2;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(batchingMaxMessages)
+                .batchingMaxPublishDelay(60, TimeUnit.SECONDS)
+                .create();
+
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            futures.add(producer.sendAsync(("msg-" + i).getBytes(UTF_8)));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        Map<MessageIdImpl, Integer> messageIdMap = new HashMap<>();
+        futures.forEach(n -> {
+            try {
+                MessageId messageId = n.get();
+                if (messageId instanceof MessageIdImpl impl) {
+                    MessageIdImpl key = new MessageIdImpl(impl.getLedgerId(), 
impl.getEntryId(), -1);
+                    Integer i = messageIdMap.computeIfAbsent(key, __ -> 0);
+                    messageIdMap.put(key, i + 1);
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        messageIdMap.forEach((key, value) -> {
+            assertEquals(value, batchingMaxMessages);
+            try {
+                List<Message<byte[]>> messages = 
admin.topics().getMessagesById(topic,
+                        key.getLedgerId(), key.getEntryId());
+                assertNotNull(messages);
+                assertEquals(messages.size(), batchingMaxMessages);
+            } catch (PulsarAdminException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        // The message id doesn't exist.
+        assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics()
+                .getMessagesById(topic, 1024, 2048));
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index cace5cda7bd..574b859e82c 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1679,7 +1679,9 @@ public interface Topics {
      * @return the message indexed by the messageId
      * @throws PulsarAdminException
      *            Unexpected error
+     * @deprecated Using {@link #getMessagesById(String, long, long)} instead.
      */
+    @Deprecated
     Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) 
throws PulsarAdminException;
 
     /**
@@ -1691,9 +1693,32 @@ public interface Topics {
      * @param entryId
      *            Entry id
      * @return a future that can be used to track when the message is returned
+     * @deprecated Using {@link #getMessagesByIdAsync(String, long, long)} 
instead.
      */
+    @Deprecated
     CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long 
ledgerId, long entryId);
 
+    /**
+     * Get the messages by messageId.
+     *
+     * @param topic    Topic name
+     * @param ledgerId Ledger id
+     * @param entryId  Entry id
+     * @return A set of messages.
+     * @throws PulsarAdminException Unexpected error
+     */
+    List<Message<byte[]>> getMessagesById(String topic, long ledgerId, long 
entryId) throws PulsarAdminException;
+
+    /**
+     * Get the messages by messageId asynchronously.
+     *
+     * @param topic    Topic name
+     * @param ledgerId Ledger id
+     * @param entryId  Entry id
+     * @return A future that can be used to track when a set of messages is 
returned.
+     */
+    CompletableFuture<List<Message<byte[]>>> getMessagesByIdAsync(String 
topic, long ledgerId, long entryId);
+
     /**
      * Get message ID published at or just after this absolute timestamp (in 
ms).
      * @param topic
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 39bbb134271..f76cfbcde98 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -986,20 +986,16 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
-    public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String 
topic, long ledgerId, long entryId) {
-        return getRemoteMessageById(topic, ledgerId, entryId);
-    }
-
-    private CompletableFuture<Message<byte[]>> getRemoteMessageById(String 
topic, long ledgerId, long entryId) {
+    public CompletableFuture<List<Message<byte[]>>> 
getMessagesByIdAsync(String topic, long ledgerId, long entryId) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "ledger", 
Long.toString(ledgerId), "entry", Long.toString(entryId));
-        final CompletableFuture<Message<byte[]>> future = new 
CompletableFuture<>();
+        final CompletableFuture<List<Message<byte[]>>> future = new 
CompletableFuture<>();
         asyncGetRequest(path,
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
                         try {
-                            
future.complete(getMessagesFromHttpResponse(topicName.toString(), 
response).get(0));
+                            
future.complete(getMessagesFromHttpResponse(topicName.toString(), response));
                         } catch (Exception e) {
                             future.completeExceptionally(getApiException(e));
                         }
@@ -1013,6 +1009,19 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         return future;
     }
 
+    @Override
+    public List<Message<byte[]>> getMessagesById(String topic, long ledgerId, 
long entryId)
+            throws PulsarAdminException {
+        return sync(() -> getMessagesByIdAsync(topic, ledgerId, entryId));
+    }
+
+    @Deprecated
+    @Override
+    public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String 
topic, long ledgerId, long entryId) {
+        return getMessagesByIdAsync(topic, ledgerId, entryId).thenApply(n -> 
n.get(0));
+    }
+
+    @Deprecated
     @Override
     public Message<byte[]> getMessageById(String topic, long ledgerId, long 
entryId)
             throws PulsarAdminException {

Reply via email to