This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a9186514089 [improve][client] PIP-224: Add getLastMessageIds API 
(#20040)
a9186514089 is described below

commit a91865140893f7e9737f6ce9ffc052584c721884
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Tue Apr 11 10:25:57 2023 +0800

    [improve][client] PIP-224: Add getLastMessageIds API (#20040)
    
    Co-authored-by: Baodi Shi <wudixiaolong...@icloud.com>
---
 .../apache/pulsar/client/api/TopicReaderTest.java  |  6 ++++
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 37 ++++++++++++++++++++++
 .../org/apache/pulsar/client/api/Consumer.java     | 21 ++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 15 +++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  7 ++++
 .../pulsar/client/impl/MultiMessageIdImpl.java     |  1 +
 .../client/impl/MultiTopicsConsumerImpl.java       | 13 ++++++++
 7 files changed, 100 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index c2eb957ee60..424081b904c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -1096,6 +1096,12 @@ public class TopicReaderTest extends 
ProducerConsumerBase {
         assertFalse(lastMsgId instanceof BatchMessageIdImpl);
         assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
         assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
+        List<TopicMessageId> lastMsgIds = 
reader.getConsumer().getLastMessageIds();
+        assertEquals(lastMsgIds.size(), 1);
+        assertEquals(lastMsgIds.get(0).getOwnerTopic(), topicName);
+        MessageIdAdv lastMsgIdAdv = (MessageIdAdv) lastMsgIds.get(0);
+        assertEquals(lastMsgIdAdv.getLedgerId(), messageId.getLedgerId());
+        assertEquals(lastMsgIdAdv.getEntryId(), messageId.getEntryId());
         reader.close();
 
         CountDownLatch latch = new CountDownLatch(numOfMessage);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index ce4a0ae86ac..73fe9799642 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
@@ -42,7 +43,9 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -1097,6 +1100,11 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         admin.topics().createPartitionedTopic(topicName2, 2);
         admin.topics().createPartitionedTopic(topicName3, 3);
 
+        final Set<String> topics = new HashSet<>();
+        topics.add(topicName1);
+        IntStream.range(0, 2).forEach(i -> topics.add(topicName2 + 
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+        IntStream.range(0, 3).forEach(i -> topics.add(topicName3 + 
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+
         // 1. producer connect
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1)
             .enableBatching(false)
@@ -1146,12 +1154,27 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             }
         });
 
+        List<TopicMessageId> msgIds = consumer.getLastMessageIds();
+        assertEquals(msgIds.size(), 6);
+        
assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()),
 topics);
+        for (TopicMessageId msgId : msgIds) {
+            int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1;
+            if (msgId.getOwnerTopic().equals(topicName1)) {
+                assertEquals(numMessages, totalMessages);
+            } else if (msgId.getOwnerTopic().startsWith(topicName2)) {
+                assertEquals(numMessages, totalMessages / 2);
+            } else {
+                assertEquals(numMessages, totalMessages / 3);
+            }
+        }
+
         for (int i = 0; i < totalMessages; i++) {
             producer1.send((messagePredicate + "producer1-" + i).getBytes());
             producer2.send((messagePredicate + "producer2-" + i).getBytes());
             producer3.send((messagePredicate + "producer3-" + i).getBytes());
         }
 
+
         messageId = consumer.getLastMessageId();
         assertTrue(messageId instanceof MultiMessageIdImpl);
         MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId;
@@ -1170,6 +1193,20 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             }
         });
 
+        msgIds = consumer.getLastMessageIds();
+        assertEquals(msgIds.size(), 6);
+        
assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()),
 topics);
+        for (TopicMessageId msgId : msgIds) {
+            int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1;
+            if (msgId.getOwnerTopic().equals(topicName1)) {
+                assertEquals(numMessages, totalMessages * 2);
+            } else if (msgId.getOwnerTopic().startsWith(topicName2)) {
+                assertEquals(numMessages, totalMessages);
+            } else {
+                assertEquals(numMessages, totalMessages / 3 * 2);
+            }
+        }
+
         consumer.unsubscribe();
         consumer.close();
         producer1.close();
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 69409900496..88ad24fe1f4 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -539,16 +540,36 @@ public interface Consumer<T> extends Closeable, 
MessageAcknowledger {
      * Get the last message id available for consume.
      *
      * @return the last message id.
+     * @apiNote If the consumer is a multi-topics consumer, the returned value 
cannot be used anywhere.
+     * @deprecated Use {@link Consumer#getLastMessageIds()} instead.
      */
+    @Deprecated
     MessageId getLastMessageId() throws PulsarClientException;
 
     /**
      * Get the last message id available for consume.
      *
      * @return a future that can be used to track the completion of the 
operation.
+     * @deprecated Use {@link Consumer#getLastMessageIdsAsync()}} instead.
      */
+    @Deprecated
     CompletableFuture<MessageId> getLastMessageIdAsync();
 
+    /**
+     * Get all the last message id of the topics the consumer subscribed.
+     *
+     * @return the list of TopicMessageId instances of all the topics that the 
consumer subscribed
+     * @throws PulsarClientException if failed to get last message id.
+     * @apiNote It's guaranteed that the owner topic of each TopicMessageId in 
the returned list is different from owner
+     *   topics of other TopicMessageId instances
+     */
+    List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
+
+    /**
+     * The asynchronous version of {@link Consumer#getLastMessageIds()}.
+     */
+    CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
+
     /**
      * @return Whether the consumer is connected to the broker
      */
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 973b3302f41..0db2a8e0ab9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -730,6 +731,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     public abstract CompletableFuture<Void> closeAsync();
 
 
+    @Deprecated
     @Override
     public MessageId getLastMessageId() throws PulsarClientException {
         try {
@@ -742,9 +744,22 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
     }
 
+    @Deprecated
     @Override
     public abstract CompletableFuture<MessageId> getLastMessageIdAsync();
 
+    @Override
+    public List<TopicMessageId> getLastMessageIds() throws 
PulsarClientException {
+        try {
+            return getLastMessageIdsAsync().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
+            throw PulsarClientException.unwrap(e);
+        }
+    }
+
     private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
         return SubscriptionType.Shared != type && SubscriptionType.Key_Shared 
!= type;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fb372566426..cc016093196 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2336,11 +2336,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
+    @Deprecated
     @Override
     public CompletableFuture<MessageId> getLastMessageIdAsync() {
         return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId);
     }
 
+    @Override
+    public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
+        return getLastMessageIdAsync()
+                .thenApply(msgId -> 
Collections.singletonList(TopicMessageId.create(topic, msgId)));
+    }
+
     public CompletableFuture<GetLastMessageIdResponse> 
internalGetLastMessageIdAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
index 6e60239ffe5..f40e3476dd0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.MessageId;
  * This is useful when MessageId is need for partition/multi-topics/pattern 
consumer.
  * e.g. seek(), ackCumulative(), getLastMessageId().
  */
+@Deprecated
 public class MultiMessageIdImpl implements MessageId {
     @Getter
     private Map<String, MessageId> map;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 5fe0e4a82b8..ef0345de919 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1468,6 +1468,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return partitionsAutoUpdateTimeout;
     }
 
+    @Deprecated
     @Override
     public CompletableFuture<MessageId> getLastMessageIdAsync() {
         CompletableFuture<MessageId> returnFuture = new CompletableFuture<>();
@@ -1496,6 +1497,18 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return returnFuture;
     }
 
+    @Override
+    public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
+        final List<CompletableFuture<List<TopicMessageId>>> futures = 
consumers.values().stream()
+                .map(ConsumerImpl::getLastMessageIdsAsync)
+                .collect(Collectors.toList());
+        return FutureUtil.waitForAll(futures).thenApply(__ -> {
+            final List<TopicMessageId> messageIds = new ArrayList<>();
+            
futures.stream().map(CompletableFuture::join).forEach(messageIds::addAll);
+            return messageIds;
+        });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
 
     public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {

Reply via email to