This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 01badd25a60a58e1b1c74831acb11a05cc65971a Author: WJL3333 <[email protected]> AuthorDate: Wed Nov 2 22:32:42 2022 +0800 [improve][broker] Get lowest PositionImpl from NavigableSet (#18278) * [cleanup] Direct get lowest PositionImpl from TreeMap change signature from Set<T> to NavigableSet<T> which makes the caller to get lowest PositionImpl more efficient. * change poll to first when call `NavigableSet` * fix check style remove unused import Co-authored-by: wangjinlong <[email protected]> --- .../broker/delayed/DelayedDeliveryTracker.java | 4 ++-- .../delayed/InMemoryDelayedDeliveryTracker.java | 6 +++--- .../persistent/MessageRedeliveryController.java | 3 ++- .../PersistentDispatcherMultipleConsumers.java | 20 ++++++++++++-------- ...rsistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++---- .../utils/ConcurrentBitmapSortedLongPairSet.java | 4 ++-- 6 files changed, 27 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java index 35853d3599b..68943f6c398 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.delayed; import com.google.common.annotations.Beta; -import java.util.Set; +import java.util.NavigableSet; import org.apache.bookkeeper.mledger.impl.PositionImpl; /** @@ -53,7 +53,7 @@ public interface DelayedDeliveryTracker extends AutoCloseable { /** * Get a set of position of messages that have already reached the delivery time. */ - Set<PositionImpl> getScheduledMessages(int maxMessages); + NavigableSet<PositionImpl> getScheduledMessages(int maxMessages); /** * Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 81e174cdc80..c7e6e431fcc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -22,7 +22,7 @@ import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.time.Clock; -import java.util.Set; +import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -146,9 +146,9 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T * Get a set of position of messages that have already reached. */ @Override - public Set<PositionImpl> getScheduledMessages(int maxMessages) { + public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) { int n = maxMessages; - Set<PositionImpl> positions = new TreeSet<>(); + NavigableSet<PositionImpl> positions = new TreeSet<>(); long cutoffTime = getCutoffTime(); while (n > 0 && !priorityQueue.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index a8f6c14c537..2745facf21e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent; import com.google.common.collect.ComparisonChain; import java.util.ArrayList; import java.util.List; +import java.util.NavigableSet; import java.util.Set; import javax.annotation.concurrent.NotThreadSafe; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -135,7 +136,7 @@ public class MessageRedeliveryController { return false; } - public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { + public NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index f74ac857999..ef684d72397 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -24,6 +24,7 @@ import com.google.common.collect.Range; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.NavigableSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -248,7 +249,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul return; } - Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead); + NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead); if (!messagesToReplayNow.isEmpty()) { if (log.isDebugEnabled()) { @@ -257,7 +258,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } havePendingReplayRead = true; - minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null); + minReplayedPosition = messagesToReplayNow.first(); Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled() ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket @@ -281,11 +282,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul consumerList.size()); } havePendingRead = true; - Set<PositionImpl> toReplay = getMessagesToReplayNow(1); - minReplayedPosition = toReplay.stream().findFirst().orElse(null); - if (minReplayedPosition != null) { + NavigableSet<PositionImpl> toReplay = getMessagesToReplayNow(1); + if (!toReplay.isEmpty()) { + minReplayedPosition = toReplay.first(); redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId()); + } else { + minReplayedPosition = null; } + cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal, topic.getMaxReadPosition()); } else { @@ -901,17 +905,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } } - protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { + protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { if (!redeliveryMessages.isEmpty()) { return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead); } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); - Set<PositionImpl> messagesAvailableNow = + NavigableSet<PositionImpl> messagesAvailableNow = delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId())); return messagesAvailableNow; } else { - return Collections.emptySet(); + return Collections.emptyNavigableSet(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index c5fdc950727..5be832934db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -168,9 +169,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi // A corner case that we have to retry a readMoreEntries in order to preserver order delivery. // This may happen when consumer closed. See issue #12885 for details. if (!allowOutOfOrderDelivery) { - Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1); + NavigableSet<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1); if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) { - PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get(); + PositionImpl replayPosition = messagesToReplayNow.first(); + // We have received a message potentially from the delayed tracker and, since we're not using it // right now, it needs to be added to the redelivery tracker or we won't attempt anymore to // resend it (until we disconnect consumer). @@ -423,13 +425,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } @Override - protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { + protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { if (isDispatcherStuckOnReplays) { // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked // messages kicks in), instead of keep replaying the same old messages, since the consumer that these // messages are routing to might be busy at the moment this.isDispatcherStuckOnReplays = false; - return Collections.emptySet(); + return Collections.emptyNavigableSet(); } else { return super.getMessagesToReplayNow(maxMessagesToRead); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java index c9f1c65daca..a33a78f1eae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java @@ -22,7 +22,6 @@ import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; -import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.locks.ReadWriteLock; @@ -95,7 +94,8 @@ public class ConcurrentBitmapSortedLongPairSet { } - public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) { + public <T extends Comparable<T>> NavigableSet<T> items(int numberOfItems, + LongPairSet.LongPairFunction<T> longPairConverter) { NavigableSet<T> items = new TreeSet<>(); lock.readLock().lock(); try {
