This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d00a35ee67e [improve][client] Implement getLastMessageIds API for 
Reader (#21051)
d00a35ee67e is described below

commit d00a35ee67e0ba5a1417a21de231351b0966758c
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Sep 12 18:10:49 2023 +0800

    [improve][client] Implement getLastMessageIds API for Reader (#21051)
    
    pip: https://github.com/apache/pulsar/pull/21052
    ### Motivation
    
    Introduce the `getLastMessageIds` API to Reader.
    
    ### Modifications
    
    Implement getLastMessageIds API for Reader
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 52 ++++++++++++++++++++++
 .../java/org/apache/pulsar/client/api/Reader.java  | 16 +++++++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  | 12 +++++
 .../org/apache/pulsar/client/impl/ReaderImpl.java  | 12 +++++
 .../apache/pulsar/websocket/ReaderHandlerTest.java | 12 +++++
 5 files changed, 104 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index a50c92f7ab8..64a5da43d44 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -52,6 +53,7 @@ import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
@@ -139,6 +141,56 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertFalse(reader.hasMessageAvailable());
     }
 
+    @Test
+    public void testReaderGetLastMessageIds() throws Exception {
+        String topic1 = 
"persistent://my-property/my-ns/testReaderGetLastMessageIds-1";
+        String topic2 = 
"persistent://my-property/my-ns/testReaderGetLastMessageIds-2";
+        List<String> topicList = new ArrayList<>();
+        topicList.add(topic1);
+        topicList.add(topic2);
+        @Cleanup
+        Reader<byte[]> reader1 = pulsarClient.newReader()
+                .topic(topic1)
+                .startMessageId(MessageId.earliest)
+                .readerName(subscription)
+                .create();
+        @Cleanup
+        Reader<byte[]> reader2 = pulsarClient.newReader()
+                .topics(topicList)
+                .startMessageId(MessageId.earliest)
+                .readerName(subscription)
+                .create();
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topic1)
+                .create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer()
+                .topic(topic2)
+                .create();
+        MessageIdImpl messageId1 = (MessageIdImpl) 
producer1.newMessage().send();
+        MessageIdImpl messageId2 = (MessageIdImpl) 
producer2.newMessage().send();
+        reader1.readNext();
+        reader2.readNext();
+        reader2.readNext();
+        List<TopicMessageId> topicMessageIds1 =  reader1.getLastMessageIds();
+        assertEquals(topicMessageIds1.size(), 1);
+        assertEquals(topicMessageIds1.get(0).getOwnerTopic(), topic1);
+        assertEquals(((MessageIdAdv)topicMessageIds1.get(0)).getEntryId(), 
messageId1.getEntryId());
+        assertEquals(((MessageIdAdv)topicMessageIds1.get(0)).getLedgerId(), 
messageId1.getLedgerId());
+
+        List<TopicMessageId> topicMessageIds2 = reader2.getLastMessageIds();
+        assertEquals(topicMessageIds2.size(), 2);
+        for (TopicMessageId topicMessageId: topicMessageIds2) {
+            if (topicMessageId.getOwnerTopic().equals(topic1)) {
+                assertEquals(((MessageIdAdv)topicMessageId).getEntryId(), 
messageId1.getEntryId());
+                assertEquals(((MessageIdAdv)topicMessageId).getLedgerId(), 
messageId1.getLedgerId());
+            } else {
+                assertEquals(((MessageIdAdv)topicMessageId).getEntryId(), 
messageId2.getEntryId());
+                assertEquals(((MessageIdAdv)topicMessageId).getLedgerId(), 
messageId2.getLedgerId());
+            }
+        }
+    }
+
     @Test
     public void testReadMessageWithBatching() throws Exception {
         String topic = 
"persistent://my-property/my-ns/my-reader-topic-with-batching";
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
index 419a759f118..f151278ebad 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -217,4 +218,19 @@ public interface Reader<T> extends Closeable {
      * @return a future to track the completion of the seek operation
      */
     CompletableFuture<Void> seekAsync(long timestamp);
+
+    /**
+     * Get all the last message id of the topics the reader subscribed.
+     *
+     * @return the list of TopicMessageId instances of all the topics that the 
reader subscribed
+     * @throws PulsarClientException if failed to get last message id.
+     * @apiNote It's guaranteed that the owner topic of each TopicMessageId in 
the returned list is different from owner
+     *   topics of other TopicMessageId instances
+     */
+    List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
+
+    /**
+     * The asynchronous version of {@link Reader#getLastMessageIds()}.
+     */
+    CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index bef2ccaf92e..86f3199f297 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -38,6 +39,7 @@ import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
@@ -235,4 +237,14 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> 
{
     public MultiTopicsConsumerImpl<T> getMultiTopicsConsumer() {
         return multiTopicsConsumer;
     }
+
+    @Override
+    public List<TopicMessageId> getLastMessageIds() throws 
PulsarClientException {
+        return multiTopicsConsumer.getLastMessageIds();
+    }
+
+    @Override
+    public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
+        return multiTopicsConsumer.getLastMessageIdsAsync();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 6f8d4bcbc75..8760d69447a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
@@ -257,4 +259,14 @@ public class ReaderImpl<T> implements Reader<T> {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         return consumer.seekAsync(timestamp);
     }
+
+    @Override
+    public List<TopicMessageId> getLastMessageIds() throws 
PulsarClientException {
+        return consumer.getLastMessageIds();
+    }
+
+    @Override
+    public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
+        return consumer.getLastMessageIdsAsync();
+    }
 }
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
index 002f15609cd..9ad9368d277 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.websocket;
 
+import java.util.List;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
@@ -214,5 +216,15 @@ public class ReaderHandlerTest {
         public void close() throws IOException {
 
         }
+
+        @Override
+        public List<TopicMessageId> getLastMessageIds() throws 
PulsarClientException {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<List<TopicMessageId>> 
getLastMessageIdsAsync() {
+            return null;
+        }
     }
 }

Reply via email to