This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 22f7323c64c [feat][admin] PIP-330: getMessagesById gets all messages
(#21918)
22f7323c64c is described below
commit 22f7323c64caadf92003f8c916308f12d81da486
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Jan 22 19:57:18 2024 +0800
[feat][admin] PIP-330: getMessagesById gets all messages (#21918)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../pulsar/broker/admin/AdminTopicApiTest.java | 64 ++++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 25 +++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 23 +++++---
3 files changed, 105 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
index 93bf2349103..a1ed4271616 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
@@ -19,21 +19,33 @@
package org.apache.pulsar.broker.admin;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.policies.data.TopicStats;
import
org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -140,4 +152,56 @@ public class AdminTopicApiTest extends
ProducerConsumerBase {
}
assertTrue(stats.getSubscriptions().containsKey(subscriptionName));
}
+
+ @Test
+ public void testGetMessagesId() throws PulsarClientException,
ExecutionException, InterruptedException {
+ String topic = newTopicName();
+
+ int numMessages = 10;
+ int batchingMaxMessages = numMessages / 2;
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxMessages(batchingMaxMessages)
+ .batchingMaxPublishDelay(60, TimeUnit.SECONDS)
+ .create();
+
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ futures.add(producer.sendAsync(("msg-" + i).getBytes(UTF_8)));
+ }
+ FutureUtil.waitForAll(futures).get();
+
+ Map<MessageIdImpl, Integer> messageIdMap = new HashMap<>();
+ futures.forEach(n -> {
+ try {
+ MessageId messageId = n.get();
+ if (messageId instanceof MessageIdImpl impl) {
+ MessageIdImpl key = new MessageIdImpl(impl.getLedgerId(),
impl.getEntryId(), -1);
+ Integer i = messageIdMap.computeIfAbsent(key, __ -> 0);
+ messageIdMap.put(key, i + 1);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ messageIdMap.forEach((key, value) -> {
+ assertEquals(value, batchingMaxMessages);
+ try {
+ List<Message<byte[]>> messages =
admin.topics().getMessagesById(topic,
+ key.getLedgerId(), key.getEntryId());
+ assertNotNull(messages);
+ assertEquals(messages.size(), batchingMaxMessages);
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // The message id doesn't exist.
+ assertThrows(PulsarAdminException.NotFoundException.class, () ->
admin.topics()
+ .getMessagesById(topic, 1024, 2048));
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index cace5cda7bd..574b859e82c 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1679,7 +1679,9 @@ public interface Topics {
* @return the message indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
+ * @deprecated Using {@link #getMessagesById(String, long, long)} instead.
*/
+ @Deprecated
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId)
throws PulsarAdminException;
/**
@@ -1691,9 +1693,32 @@ public interface Topics {
* @param entryId
* Entry id
* @return a future that can be used to track when the message is returned
+ * @deprecated Using {@link #getMessagesByIdAsync(String, long, long)}
instead.
*/
+ @Deprecated
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long
ledgerId, long entryId);
+ /**
+ * Get the messages by messageId.
+ *
+ * @param topic Topic name
+ * @param ledgerId Ledger id
+ * @param entryId Entry id
+ * @return A set of messages.
+ * @throws PulsarAdminException Unexpected error
+ */
+ List<Message<byte[]>> getMessagesById(String topic, long ledgerId, long
entryId) throws PulsarAdminException;
+
+ /**
+ * Get the messages by messageId asynchronously.
+ *
+ * @param topic Topic name
+ * @param ledgerId Ledger id
+ * @param entryId Entry id
+ * @return A future that can be used to track when a set of messages is
returned.
+ */
+ CompletableFuture<List<Message<byte[]>>> getMessagesByIdAsync(String
topic, long ledgerId, long entryId);
+
/**
* Get message ID published at or just after this absolute timestamp (in
ms).
* @param topic
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 39bbb134271..f76cfbcde98 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -986,20 +986,16 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
- public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String
topic, long ledgerId, long entryId) {
- return getRemoteMessageById(topic, ledgerId, entryId);
- }
-
- private CompletableFuture<Message<byte[]>> getRemoteMessageById(String
topic, long ledgerId, long entryId) {
+ public CompletableFuture<List<Message<byte[]>>>
getMessagesByIdAsync(String topic, long ledgerId, long entryId) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "ledger",
Long.toString(ledgerId), "entry", Long.toString(entryId));
- final CompletableFuture<Message<byte[]>> future = new
CompletableFuture<>();
+ final CompletableFuture<List<Message<byte[]>>> future = new
CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
-
future.complete(getMessagesFromHttpResponse(topicName.toString(),
response).get(0));
+
future.complete(getMessagesFromHttpResponse(topicName.toString(), response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
@@ -1013,6 +1009,19 @@ public class TopicsImpl extends BaseResource implements
Topics {
return future;
}
+ @Override
+ public List<Message<byte[]>> getMessagesById(String topic, long ledgerId,
long entryId)
+ throws PulsarAdminException {
+ return sync(() -> getMessagesByIdAsync(topic, ledgerId, entryId));
+ }
+
+ @Deprecated
+ @Override
+ public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String
topic, long ledgerId, long entryId) {
+ return getMessagesByIdAsync(topic, ledgerId, entryId).thenApply(n ->
n.get(0));
+ }
+
+ @Deprecated
@Override
public Message<byte[]> getMessageById(String topic, long ledgerId, long
entryId)
throws PulsarAdminException {