codelipenghui commented on a change in pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#discussion_r491861050



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -681,12 +713,45 @@ public void seek(long timestamp) throws 
PulsarClientException {
         try {
             seekAsync(timestamp).get();
         } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        return FutureUtil.failedFuture(new PulsarClientException("Seek 
operation not supported on topics consumer"));
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        MessageIdImpl targetMessageId = 
MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (targetMessageId == null || 
isIllegalMultiTopicsMessageId(messageId)) {
+            resultFuture.completeExceptionally(
+                    new PulsarClientException("Illegal messageId, messageId 
can only be earliest、latest or determine partition"));
+            return resultFuture;
+        }
+
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(consumers.size());
+        consumers.values().forEach(consumerImpl -> {
+            if (MessageId.latest.equals(messageId) || 
MessageId.earliest.equals(messageId)
+                    || consumerImpl.getPartitionIndex() == 
targetMessageId.getPartitionIndex()) {

Review comment:
       If the internal topics of the multiple topics consumer  with the same 
partition index but different topic name, it will introduce some problems here?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -81,6 +83,9 @@
     // shared incoming queue was full
     private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
 
+    //When consumerImpl is added to the set, its messages will be ignored
+    private final ConcurrentOpenHashSet<ConsumerImpl<T>> ignoredConsumers;

Review comment:
       Why need a set for storing ignored consumers here? I noticed if a 
consumer is added to this set, the message will be acked and users will miss 
messages of this consumer. Is it an expected behavior?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -1272,4 +1389,35 @@ public Timeout getPartitionsAutoUpdateTimeout() {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
+
+    public boolean addIgnoreConsumer(ConsumerImpl<T> consumer) {
+        return ignoredConsumers.add(consumer);
+    }
+
+    public boolean removeIgnoreConsumer(ConsumerImpl<T> consumer) {
+        return ignoredConsumers.remove(consumer);
+    }
+
+    public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {
+        //only support earliest/latest and messageId contains certain 
partition info
+        if (MessageId.earliest.equals(messageId) || 
MessageId.latest.equals(messageId)) {
+            return false;
+        }
+        MessageIdImpl messageIdImpl = 
MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (messageIdImpl != null && messageIdImpl.getPartitionIndex() >= 0 && 
messageIdImpl.getLedgerId() >= 0
+                && messageIdImpl.getEntryId() >= 0) {
+            return false;
+        }
+        return true;
+    }
+
+    public void tryAcknowledgeMessage(Message<T> msg) {
+        if (msg != null) {
+            BatchMessageIdImpl batchMessageId = new 
BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()));
+            //Non-batching messages ack every time, batchMessage only need to 
ack the last one to avoid multi recycle
+            if (batchMessageId.getBatchIndex() < 0 || 
batchMessageId.getBatchSize() - 1 == batchMessageId.getBatchIndex()) {
+                acknowledgeCumulativeAsync(msg);

Review comment:
       I think here also assume that all topics of the multiple topics consumer 
are under a partitioned topic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to