codelipenghui commented on a change in pull request #7518:
URL: https://github.com/apache/pulsar/pull/7518#discussion_r491861050
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -681,12 +713,45 @@ public void seek(long timestamp) throws
PulsarClientException {
try {
seekAsync(timestamp).get();
} catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
- return FutureUtil.failedFuture(new PulsarClientException("Seek
operation not supported on topics consumer"));
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ MessageIdImpl targetMessageId =
MessageIdImpl.convertToMessageIdImpl(messageId);
+ if (targetMessageId == null ||
isIllegalMultiTopicsMessageId(messageId)) {
+ resultFuture.completeExceptionally(
+ new PulsarClientException("Illegal messageId, messageId
can only be earliest、latest or determine partition"));
+ return resultFuture;
+ }
+
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(consumers.size());
+ consumers.values().forEach(consumerImpl -> {
+ if (MessageId.latest.equals(messageId) ||
MessageId.earliest.equals(messageId)
+ || consumerImpl.getPartitionIndex() ==
targetMessageId.getPartitionIndex()) {
Review comment:
If the internal topics of the multiple topics consumer with the same
partition index but different topic name, it will introduce some problems here?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -81,6 +83,9 @@
// shared incoming queue was full
private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
+ //When consumerImpl is added to the set, its messages will be ignored
+ private final ConcurrentOpenHashSet<ConsumerImpl<T>> ignoredConsumers;
Review comment:
Why need a set for storing ignored consumers here? I noticed if a
consumer is added to this set, the message will be acked and users will miss
messages of this consumer. Is it an expected behavior?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -1272,4 +1389,35 @@ public Timeout getPartitionsAutoUpdateTimeout() {
}
private static final Logger log =
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
+
+ public boolean addIgnoreConsumer(ConsumerImpl<T> consumer) {
+ return ignoredConsumers.add(consumer);
+ }
+
+ public boolean removeIgnoreConsumer(ConsumerImpl<T> consumer) {
+ return ignoredConsumers.remove(consumer);
+ }
+
+ public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {
+ //only support earliest/latest and messageId contains certain
partition info
+ if (MessageId.earliest.equals(messageId) ||
MessageId.latest.equals(messageId)) {
+ return false;
+ }
+ MessageIdImpl messageIdImpl =
MessageIdImpl.convertToMessageIdImpl(messageId);
+ if (messageIdImpl != null && messageIdImpl.getPartitionIndex() >= 0 &&
messageIdImpl.getLedgerId() >= 0
+ && messageIdImpl.getEntryId() >= 0) {
+ return false;
+ }
+ return true;
+ }
+
+ public void tryAcknowledgeMessage(Message<T> msg) {
+ if (msg != null) {
+ BatchMessageIdImpl batchMessageId = new
BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()));
+ //Non-batching messages ack every time, batchMessage only need to
ack the last one to avoid multi recycle
+ if (batchMessageId.getBatchIndex() < 0 ||
batchMessageId.getBatchSize() - 1 == batchMessageId.getBatchIndex()) {
+ acknowledgeCumulativeAsync(msg);
Review comment:
I think here also assume that all topics of the multiple topics consumer
are under a partitioned topic?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]