This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a9186514089 [improve][client] PIP-224: Add getLastMessageIds API (#20040) a9186514089 is described below commit a91865140893f7e9737f6ce9ffc052584c721884 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Tue Apr 11 10:25:57 2023 +0800 [improve][client] PIP-224: Add getLastMessageIds API (#20040) Co-authored-by: Baodi Shi <wudixiaolong...@icloud.com> --- .../apache/pulsar/client/api/TopicReaderTest.java | 6 ++++ .../pulsar/client/impl/TopicsConsumerImplTest.java | 37 ++++++++++++++++++++++ .../org/apache/pulsar/client/api/Consumer.java | 21 ++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 15 +++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 7 ++++ .../pulsar/client/impl/MultiMessageIdImpl.java | 1 + .../client/impl/MultiTopicsConsumerImpl.java | 13 ++++++++ 7 files changed, 100 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index c2eb957ee60..424081b904c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -1096,6 +1096,12 @@ public class TopicReaderTest extends ProducerConsumerBase { assertFalse(lastMsgId instanceof BatchMessageIdImpl); assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId()); assertEquals(lastMsgId.getEntryId(), messageId.getEntryId()); + List<TopicMessageId> lastMsgIds = reader.getConsumer().getLastMessageIds(); + assertEquals(lastMsgIds.size(), 1); + assertEquals(lastMsgIds.get(0).getOwnerTopic(), topicName); + MessageIdAdv lastMsgIdAdv = (MessageIdAdv) lastMsgIds.get(0); + assertEquals(lastMsgIdAdv.getLedgerId(), messageId.getLedgerId()); + assertEquals(lastMsgIdAdv.getEntryId(), messageId.getEntryId()); reader.close(); CountDownLatch latch = new CountDownLatch(numOfMessage); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index ce4a0ae86ac..73fe9799642 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -42,7 +43,9 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.TopicMetadata; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; @@ -1097,6 +1100,11 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.topics().createPartitionedTopic(topicName2, 2); admin.topics().createPartitionedTopic(topicName3, 3); + final Set<String> topics = new HashSet<>(); + topics.add(topicName1); + IntStream.range(0, 2).forEach(i -> topics.add(topicName2 + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + IntStream.range(0, 3).forEach(i -> topics.add(topicName3 + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) .enableBatching(false) @@ -1146,12 +1154,27 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { } }); + List<TopicMessageId> msgIds = consumer.getLastMessageIds(); + assertEquals(msgIds.size(), 6); + assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics); + for (TopicMessageId msgId : msgIds) { + int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1; + if (msgId.getOwnerTopic().equals(topicName1)) { + assertEquals(numMessages, totalMessages); + } else if (msgId.getOwnerTopic().startsWith(topicName2)) { + assertEquals(numMessages, totalMessages / 2); + } else { + assertEquals(numMessages, totalMessages / 3); + } + } + for (int i = 0; i < totalMessages; i++) { producer1.send((messagePredicate + "producer1-" + i).getBytes()); producer2.send((messagePredicate + "producer2-" + i).getBytes()); producer3.send((messagePredicate + "producer3-" + i).getBytes()); } + messageId = consumer.getLastMessageId(); assertTrue(messageId instanceof MultiMessageIdImpl); MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId; @@ -1170,6 +1193,20 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { } }); + msgIds = consumer.getLastMessageIds(); + assertEquals(msgIds.size(), 6); + assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics); + for (TopicMessageId msgId : msgIds) { + int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1; + if (msgId.getOwnerTopic().equals(topicName1)) { + assertEquals(numMessages, totalMessages * 2); + } else if (msgId.getOwnerTopic().startsWith(topicName2)) { + assertEquals(numMessages, totalMessages); + } else { + assertEquals(numMessages, totalMessages / 3 * 2); + } + } + consumer.unsubscribe(); consumer.close(); producer1.close(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index 69409900496..88ad24fe1f4 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.Closeable; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -539,16 +540,36 @@ public interface Consumer<T> extends Closeable, MessageAcknowledger { * Get the last message id available for consume. * * @return the last message id. + * @apiNote If the consumer is a multi-topics consumer, the returned value cannot be used anywhere. + * @deprecated Use {@link Consumer#getLastMessageIds()} instead. */ + @Deprecated MessageId getLastMessageId() throws PulsarClientException; /** * Get the last message id available for consume. * * @return a future that can be used to track the completion of the operation. + * @deprecated Use {@link Consumer#getLastMessageIdsAsync()}} instead. */ + @Deprecated CompletableFuture<MessageId> getLastMessageIdAsync(); + /** + * Get all the last message id of the topics the consumer subscribed. + * + * @return the list of TopicMessageId instances of all the topics that the consumer subscribed + * @throws PulsarClientException if failed to get last message id. + * @apiNote It's guaranteed that the owner topic of each TopicMessageId in the returned list is different from owner + * topics of other TopicMessageId instances + */ + List<TopicMessageId> getLastMessageIds() throws PulsarClientException; + + /** + * The asynchronous version of {@link Consumer#getLastMessageIds()}. + */ + CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync(); + /** * @return Whether the consumer is connected to the broker */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 973b3302f41..0db2a8e0ab9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; @@ -730,6 +731,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T public abstract CompletableFuture<Void> closeAsync(); + @Deprecated @Override public MessageId getLastMessageId() throws PulsarClientException { try { @@ -742,9 +744,22 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } } + @Deprecated @Override public abstract CompletableFuture<MessageId> getLastMessageIdAsync(); + @Override + public List<TopicMessageId> getLastMessageIds() throws PulsarClientException { + try { + return getLastMessageIdsAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw PulsarClientException.unwrap(e); + } catch (ExecutionException e) { + throw PulsarClientException.unwrap(e); + } + } + private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) { return SubscriptionType.Shared != type && SubscriptionType.Key_Shared != type; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fb372566426..cc016093196 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2336,11 +2336,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } } + @Deprecated @Override public CompletableFuture<MessageId> getLastMessageIdAsync() { return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId); } + @Override + public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() { + return getLastMessageIdAsync() + .thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId))); + } + public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java index 6e60239ffe5..f40e3476dd0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.MessageId; * This is useful when MessageId is need for partition/multi-topics/pattern consumer. * e.g. seek(), ackCumulative(), getLastMessageId(). */ +@Deprecated public class MultiMessageIdImpl implements MessageId { @Getter private Map<String, MessageId> map; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 5fe0e4a82b8..ef0345de919 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1468,6 +1468,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return partitionsAutoUpdateTimeout; } + @Deprecated @Override public CompletableFuture<MessageId> getLastMessageIdAsync() { CompletableFuture<MessageId> returnFuture = new CompletableFuture<>(); @@ -1496,6 +1497,18 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return returnFuture; } + @Override + public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() { + final List<CompletableFuture<List<TopicMessageId>>> futures = consumers.values().stream() + .map(ConsumerImpl::getLastMessageIdsAsync) + .collect(Collectors.toList()); + return FutureUtil.waitForAll(futures).thenApply(__ -> { + final List<TopicMessageId> messageIds = new ArrayList<>(); + futures.stream().map(CompletableFuture::join).forEach(messageIds::addAll); + return messageIds; + }); + } + private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class); public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {