This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 722dac5  Avoid double executor on non-persistent topic send operation 
(#2351)
722dac5 is described below

commit 722dac58cf5a15e79acc428b6b9978d0b0d6e85b
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Aug 15 11:54:02 2018 -0700

    Avoid double executor on non-persistent topic send operation (#2351)
---
 .../service/nonpersistent/NonPersistentTopic.java  | 42 ++++++++--------------
 1 file changed, 15 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e13549e..281d843 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -188,32 +188,24 @@ public class NonPersistentTopic implements Topic {
 
     @Override
     public void publishMessage(ByteBuf data, PublishContext callback) {
-        AtomicInteger msgDeliveryCount = new AtomicInteger(2);
+        callback.completed(null, 0L, 0L);
         ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
 
-        // retain data for sub/replication because io-thread will release 
actual payload
-        data.retain(2);
-        this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
-            subscriptions.forEach((name, subscription) -> {
-                ByteBuf duplicateBuffer = data.retainedDuplicate();
-                Entry entry = create(0L, 0L, duplicateBuffer);
-                // entry internally retains data so, duplicateBuffer should be 
release here
-                duplicateBuffer.release();
-                if (subscription.getDispatcher() != null) {
-                    
subscription.getDispatcher().sendMessages(Lists.newArrayList(entry));
-                } else {
-                    // it happens when subscription is created but dispatcher 
is not created as consumer is not added
-                    // yet
-                    entry.release();
-                }
-            });
-            data.release();
-            if (msgDeliveryCount.decrementAndGet() == 0) {
-                callback.completed(null, 0L, 0L);
+        subscriptions.forEach((name, subscription) -> {
+            ByteBuf duplicateBuffer = data.retainedDuplicate();
+            Entry entry = create(0L, 0L, duplicateBuffer);
+            // entry internally retains data so, duplicateBuffer should be 
release here
+            duplicateBuffer.release();
+            if (subscription.getDispatcher() != null) {
+                
subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
+            } else {
+                // it happens when subscription is created but dispatcher is 
not created as consumer is not added
+                // yet
+                entry.release();
             }
-        }));
+        });
 
-        this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
+        if (!replicators.isEmpty()) {
             replicators.forEach((name, replicator) -> {
                 ByteBuf duplicateBuffer = data.retainedDuplicate();
                 Entry entry = create(0L, 0L, duplicateBuffer);
@@ -221,11 +213,7 @@ public class NonPersistentTopic implements Topic {
                 duplicateBuffer.release();
                 ((NonPersistentReplicator) replicator).sendMessage(entry);
             });
-            data.release();
-            if (msgDeliveryCount.decrementAndGet() == 0) {
-                callback.completed(null, 0L, 0L);
-            }
-        }));
+        }
     }
 
     @Override

Reply via email to