This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d00a35ee67e [improve][client] Implement getLastMessageIds API for
Reader (#21051)
d00a35ee67e is described below
commit d00a35ee67e0ba5a1417a21de231351b0966758c
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Sep 12 18:10:49 2023 +0800
[improve][client] Implement getLastMessageIds API for Reader (#21051)
pip: https://github.com/apache/pulsar/pull/21052
### Motivation
Introduce the `getLastMessageIds` API to Reader.
### Modifications
Implement getLastMessageIds API for Reader
---
.../org/apache/pulsar/client/impl/ReaderTest.java | 52 ++++++++++++++++++++++
.../java/org/apache/pulsar/client/api/Reader.java | 16 +++++++
.../pulsar/client/impl/MultiTopicsReaderImpl.java | 12 +++++
.../org/apache/pulsar/client/impl/ReaderImpl.java | 12 +++++
.../apache/pulsar/websocket/ReaderHandlerTest.java | 12 +++++
5 files changed, 104 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index a50c92f7ab8..64a5da43d44 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -52,6 +53,7 @@ import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
@@ -139,6 +141,56 @@ public class ReaderTest extends
MockedPulsarServiceBaseTest {
Assert.assertFalse(reader.hasMessageAvailable());
}
+ @Test
+ public void testReaderGetLastMessageIds() throws Exception {
+ String topic1 =
"persistent://my-property/my-ns/testReaderGetLastMessageIds-1";
+ String topic2 =
"persistent://my-property/my-ns/testReaderGetLastMessageIds-2";
+ List<String> topicList = new ArrayList<>();
+ topicList.add(topic1);
+ topicList.add(topic2);
+ @Cleanup
+ Reader<byte[]> reader1 = pulsarClient.newReader()
+ .topic(topic1)
+ .startMessageId(MessageId.earliest)
+ .readerName(subscription)
+ .create();
+ @Cleanup
+ Reader<byte[]> reader2 = pulsarClient.newReader()
+ .topics(topicList)
+ .startMessageId(MessageId.earliest)
+ .readerName(subscription)
+ .create();
+
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(topic1)
+ .create();
+ Producer<byte[]> producer2 = pulsarClient.newProducer()
+ .topic(topic2)
+ .create();
+ MessageIdImpl messageId1 = (MessageIdImpl)
producer1.newMessage().send();
+ MessageIdImpl messageId2 = (MessageIdImpl)
producer2.newMessage().send();
+ reader1.readNext();
+ reader2.readNext();
+ reader2.readNext();
+ List<TopicMessageId> topicMessageIds1 = reader1.getLastMessageIds();
+ assertEquals(topicMessageIds1.size(), 1);
+ assertEquals(topicMessageIds1.get(0).getOwnerTopic(), topic1);
+ assertEquals(((MessageIdAdv)topicMessageIds1.get(0)).getEntryId(),
messageId1.getEntryId());
+ assertEquals(((MessageIdAdv)topicMessageIds1.get(0)).getLedgerId(),
messageId1.getLedgerId());
+
+ List<TopicMessageId> topicMessageIds2 = reader2.getLastMessageIds();
+ assertEquals(topicMessageIds2.size(), 2);
+ for (TopicMessageId topicMessageId: topicMessageIds2) {
+ if (topicMessageId.getOwnerTopic().equals(topic1)) {
+ assertEquals(((MessageIdAdv)topicMessageId).getEntryId(),
messageId1.getEntryId());
+ assertEquals(((MessageIdAdv)topicMessageId).getLedgerId(),
messageId1.getLedgerId());
+ } else {
+ assertEquals(((MessageIdAdv)topicMessageId).getEntryId(),
messageId2.getEntryId());
+ assertEquals(((MessageIdAdv)topicMessageId).getLedgerId(),
messageId2.getLedgerId());
+ }
+ }
+ }
+
@Test
public void testReadMessageWithBatching() throws Exception {
String topic =
"persistent://my-property/my-ns/my-reader-topic-with-batching";
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
index 419a759f118..f151278ebad 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import java.io.Closeable;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -217,4 +218,19 @@ public interface Reader<T> extends Closeable {
* @return a future to track the completion of the seek operation
*/
CompletableFuture<Void> seekAsync(long timestamp);
+
+ /**
+ * Get all the last message id of the topics the reader subscribed.
+ *
+ * @return the list of TopicMessageId instances of all the topics that the
reader subscribed
+ * @throws PulsarClientException if failed to get last message id.
+ * @apiNote It's guaranteed that the owner topic of each TopicMessageId in
the returned list is different from owner
+ * topics of other TopicMessageId instances
+ */
+ List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
+
+ /**
+ * The asynchronous version of {@link Reader#getLastMessageIds()}.
+ */
+ CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index bef2ccaf92e..86f3199f297 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -38,6 +39,7 @@ import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
@@ -235,4 +237,14 @@ public class MultiTopicsReaderImpl<T> implements Reader<T>
{
public MultiTopicsConsumerImpl<T> getMultiTopicsConsumer() {
return multiTopicsConsumer;
}
+
+ @Override
+ public List<TopicMessageId> getLastMessageIds() throws
PulsarClientException {
+ return multiTopicsConsumer.getLastMessageIds();
+ }
+
+ @Override
+ public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
+ return multiTopicsConsumer.getLastMessageIdsAsync();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 6f8d4bcbc75..8760d69447a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
@@ -257,4 +259,14 @@ public class ReaderImpl<T> implements Reader<T> {
public CompletableFuture<Void> seekAsync(long timestamp) {
return consumer.seekAsync(timestamp);
}
+
+ @Override
+ public List<TopicMessageId> getLastMessageIds() throws
PulsarClientException {
+ return consumer.getLastMessageIds();
+ }
+
+ @Override
+ public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
+ return consumer.getLastMessageIdsAsync();
+ }
}
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
index 002f15609cd..9ad9368d277 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.websocket;
+import java.util.List;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
@@ -214,5 +216,15 @@ public class ReaderHandlerTest {
public void close() throws IOException {
}
+
+ @Override
+ public List<TopicMessageId> getLastMessageIds() throws
PulsarClientException {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<List<TopicMessageId>>
getLastMessageIdsAsync() {
+ return null;
+ }
}
}