merlimat closed pull request #1522: Reduce the contention on 
DispatcherSingleActiveConsumer by using a co…
URL: https://github.com/apache/incubator-pulsar/pull/1522
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 1b37f8946..4f6452fda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -202,7 +202,7 @@ public void reset() {
     public SubType getType() {
         return subscriptionType;
     }
-    
+
     public Consumer getActiveConsumer() {
         return ACTIVE_CONSUMER_UPDATER.get(this);
     }
@@ -210,11 +210,11 @@ public Consumer getActiveConsumer() {
     public CopyOnWriteArrayList<Consumer> getConsumers() {
         return consumers;
     }
-    
+
     public boolean isConsumerConnected() {
         return ACTIVE_CONSUMER_UPDATER.get(this) != null;
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(AbstractDispatcherSingleActiveConsumer.class);
-    
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 18afed56c..f47796d93 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -63,6 +63,7 @@
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -147,11 +148,11 @@
 
     private final EventLoopGroup acceptorGroup;
     private final EventLoopGroup workerGroup;
-    private final OrderedScheduler topicOrderedExecutor;
+    private final OrderedExecutor topicOrderedExecutor;
     // offline topic backlog cache
     private final ConcurrentOpenHashMap<TopicName, 
PersistentOfflineTopicStats> offlineTopicStatCache;
     private static final ConcurrentOpenHashMap<String, ConfigField> 
dynamicConfigurationMap = prepareDynamicConfigurationMap();
-    private final ConcurrentOpenHashMap<String, Consumer> 
configRegisteredListeners;
+    private final ConcurrentOpenHashMap<String, Consumer<?>> 
configRegisteredListeners;
 
     private final ConcurrentLinkedQueue<Pair<String, 
CompletableFuture<Topic>>> pendingTopicLoadingQueue;
 
@@ -203,7 +204,7 @@ public BrokerService(PulsarService pulsar) throws Exception 
{
 
         this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder()
                 
.numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
-                .name("broker-np-topic-workers").build();
+                .name("broker-topic-workers").build();
         final DefaultThreadFactory acceptorThreadFactory = new 
DefaultThreadFactory("pulsar-acceptor");
         final DefaultThreadFactory workersThreadFactory = new 
DefaultThreadFactory("pulsar-io");
         final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
@@ -1368,7 +1369,7 @@ private void createPendingLoadTopic() {
 
     }
 
-    public OrderedScheduler getTopicOrderedExecutor() {
+    public OrderedExecutor getTopicOrderedExecutor() {
         return topicOrderedExecutor;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 738205fdb..c3befa50d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -24,6 +24,7 @@
 import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
@@ -38,6 +39,8 @@
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.stream.Collectors;
 
+import lombok.Data;
+
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -104,6 +107,10 @@
 
     private final Map<String, String> metadata;
 
+    public interface SendListener {
+        void sendComplete(ChannelFuture future, SendMessageInfo 
sendMessageInfo);
+    }
+
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     int maxUnackedMessages, ServerCnx cnx, String appId,
@@ -179,13 +186,28 @@ public boolean readCompacted() {
      * Dispatch a list of entries to the consumer. <br/>
      * <b>It is also responsible to release entries data and recycle entries 
object.</b>
      *
-     * @return a promise that can be use to track when all the data has been 
written into the socket
+     * @return a SendMessageInfo object that contains the detail of what was 
sent to consumer
      */
     public SendMessageInfo sendMessages(final List<Entry> entries) {
+        // Empty listener
+        return sendMessages(entries, null);
+    }
+
+    /**
+     * Dispatch a list of entries to the consumer. <br/>
+     * <b>It is also responsible to release entries data and recycle entries 
object.</b>
+     *
+     * @return a SendMessageInfo object that contains the detail of what was 
sent to consumer
+     */
+    public SendMessageInfo sendMessages(final List<Entry> entries, 
SendListener listener) {
         final ChannelHandlerContext ctx = cnx.ctx();
         final SendMessageInfo sentMessages = new SendMessageInfo();
-        final ChannelPromise writePromise = ctx.newPromise();
-        sentMessages.channelPromse = writePromise;
+        final ChannelPromise writePromise = listener != null ? 
ctx.newPromise() : ctx.voidPromise();
+
+        if (listener != null) {
+            writePromise.addListener(future -> 
listener.sendComplete(writePromise, sentMessages));
+        }
+
         if (entries.isEmpty()) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] List of messages is empty, triggering write 
future immediately for consumerId {}",
@@ -651,30 +673,25 @@ private void clearUnAckedMsgs(Consumer consumer) {
         subscription.addUnAckedMessages(-unaAckedMsgs);
     }
 
-    public static class SendMessageInfo {
-        ChannelPromise channelPromse;
-        int totalSentMessages;
-        long totalSentMessageBytes;
+    public static final class SendMessageInfo {
+        private int totalSentMessages;
+        private long totalSentMessageBytes;
 
-        public ChannelPromise getChannelPromse() {
-            return channelPromse;
-        }
-        public void setChannelPromse(ChannelPromise channelPromse) {
-            this.channelPromse = channelPromse;
-        }
         public int getTotalSentMessages() {
             return totalSentMessages;
         }
+
         public void setTotalSentMessages(int totalSentMessages) {
             this.totalSentMessages = totalSentMessages;
         }
+
         public long getTotalSentMessageBytes() {
             return totalSentMessageBytes;
         }
+
         public void setTotalSentMessageBytes(long totalSentMessageBytes) {
             this.totalSentMessageBytes = totalSentMessageBytes;
         }
-
     }
 
     private static final Logger log = LoggerFactory.getLogger(Consumer.class);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index d0160d519..ad0197c33 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -26,7 +26,6 @@
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
@@ -42,7 +41,7 @@
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.util.SafeRun;
@@ -113,7 +112,7 @@
             .newUpdater(NonPersistentTopic.class, "usageCount");
     private volatile long usageCount = 0;
 
-    private final OrderedScheduler executor;
+    private final OrderedExecutor executor;
 
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 29426ffdd..7050f1dd8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -33,6 +33,7 @@
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
@@ -54,7 +55,7 @@
     private final String name;
     private DispatchRateLimiter dispatchRateLimiter;
 
-    private boolean havePendingRead = false;
+    private volatile boolean havePendingRead = false;
 
     private static final int MaxReadBatchSize = 100;
     private int readBatchSize;
@@ -162,7 +163,13 @@ protected void cancelPendingRead() {
     }
 
     @Override
-    public synchronized void readEntriesComplete(final List<Entry> entries, 
Object obj) {
+    public void readEntriesComplete(final List<Entry> entries, Object obj) {
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+            internalReadEntriesComplete(entries, obj);
+        }));
+    }
+
+    public synchronized void internalReadEntriesComplete(final List<Entry> 
entries, Object obj) {
         Consumer readConsumer = (Consumer) obj;
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Got messages: {}", name, readConsumer, 
entries.size());
@@ -195,40 +202,48 @@ public synchronized void readEntriesComplete(final 
List<Entry> entries, Object o
                 readMoreEntries(currentConsumer);
             }
         } else {
-            SendMessageInfo sentMsgInfo = 
currentConsumer.sendMessages(entries);
-            final long totalMessagesSent = sentMsgInfo.getTotalSentMessages();
-            final long totalBytesSent = sentMsgInfo.getTotalSentMessageBytes();
-            sentMsgInfo.getChannelPromse().addListener(future -> {
+            currentConsumer.sendMessages(entries, (future, sentMsgInfo) -> {
                 if (future.isSuccess()) {
                     // acquire message-dispatch permits for already delivered 
messages
                     if 
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
-                        
topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, 
totalBytesSent);
+                        
topic.getDispatchRateLimiter().tryDispatchPermit(sentMsgInfo.getTotalSentMessages(),
+                                sentMsgInfo.getTotalSentMessageBytes());
 
                         if (dispatchRateLimiter == null) {
                             dispatchRateLimiter = new 
DispatchRateLimiter(topic, name);
                         }
-                        
dispatchRateLimiter.tryDispatchPermit(totalMessagesSent, totalBytesSent);
+                        
dispatchRateLimiter.tryDispatchPermit(sentMsgInfo.getTotalSentMessages(),
+                                sentMsgInfo.getTotalSentMessageBytes());
                     }
+
                     // Schedule a new read batch operation only after the 
previous batch has been written to the socket
-                    synchronized 
(PersistentDispatcherSingleActiveConsumer.this) {
-                        Consumer newConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
-                        if (newConsumer != null && !havePendingRead) {
-                            readMoreEntries(newConsumer);
-                        } else {
-                            if (log.isDebugEnabled()) {
-                                log.debug(
-                                        "[{}-{}] Ignoring write future 
complete. consumerAvailable={} havePendingRead={}",
-                                        name, newConsumer, newConsumer != 
null, havePendingRead);
+                    
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+                        synchronized 
(PersistentDispatcherSingleActiveConsumer.this) {
+                            Consumer newConsumer = getActiveConsumer();
+                            if (newConsumer != null && !havePendingRead) {
+                                readMoreEntries(newConsumer);
+                            } else {
+                                if (log.isDebugEnabled()) {
+                                    log.debug(
+                                            "[{}-{}] Ignoring write future 
complete. consumerAvailable={} havePendingRead={}",
+                                            name, newConsumer, newConsumer != 
null, havePendingRead);
+                                }
                             }
                         }
-                    }
+                    }));
                 }
             });
         }
     }
 
     @Override
-    public synchronized void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
+    public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+            internalConsumerFlow(consumer, additionalNumberOfMessages);
+        }));
+    }
+
+    private synchronized void internalConsumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
         if (havePendingRead) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Ignoring flow control message since we 
already have a pending read req", name,
@@ -253,7 +268,13 @@ public synchronized void consumerFlow(Consumer consumer, 
int additionalNumberOfM
     }
 
     @Override
-    public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer) {
+    public void redeliverUnacknowledgedMessages(Consumer consumer) {
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+            internalRedeliverUnacknowledgedMessages(consumer);
+        }));
+    }
+
+    private synchronized void internalRedeliverUnacknowledgedMessages(Consumer 
consumer) {
         if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
             log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only 
the active consumer can call resend",
                     name, consumer);
@@ -393,8 +414,13 @@ protected void readMoreEntries(Consumer consumer) {
     }
 
     @Override
-    public synchronized void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
+    public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+        
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+            internalReadEntriesFailed(exception, ctx);
+        }));
+    }
 
+    private synchronized void internalReadEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
         havePendingRead = false;
         Consumer c = (Consumer) ctx;
 
@@ -422,19 +448,23 @@ public synchronized void 
readEntriesFailed(ManagedLedgerException exception, Obj
         readBatchSize = 1;
 
         topic.getBrokerService().executor().schedule(() -> {
-            synchronized (PersistentDispatcherSingleActiveConsumer.this) {
-                Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-                // we should retry the read if we have an active consumer and 
there is no pending read
-                if (currentConsumer != null && !havePendingRead) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}-{}] Retrying read operation", name, c);
+
+            // Jump again into dispatcher dedicated thread
+            
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
+                synchronized (PersistentDispatcherSingleActiveConsumer.this) {
+                    Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
+                    // we should retry the read if we have an active consumer 
and there is no pending read
+                    if (currentConsumer != null && !havePendingRead) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}-{}] Retrying read operation", name, 
c);
+                        }
+                        readMoreEntries(currentConsumer);
+                    } else {
+                        log.info("[{}-{}] Skipping read retry: Current 
Consumer {}, havePendingRead {}", name, c,
+                                currentConsumer, havePendingRead);
                     }
-                    readMoreEntries(currentConsumer);
-                } else {
-                    log.info("[{}-{}] Skipping read retry: Current Consumer 
{}, havePendingRead {}", name, c,
-                            currentConsumer, havePendingRead);
                 }
-            }
+            }));
         }, waitTimeMillis, TimeUnit.MILLISECONDS);
 
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to