This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 722dac5 Avoid double executor on non-persistent topic send operation
(#2351)
722dac5 is described below
commit 722dac58cf5a15e79acc428b6b9978d0b0d6e85b
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Aug 15 11:54:02 2018 -0700
Avoid double executor on non-persistent topic send operation (#2351)
---
.../service/nonpersistent/NonPersistentTopic.java | 42 ++++++++--------------
1 file changed, 15 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e13549e..281d843 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -188,32 +188,24 @@ public class NonPersistentTopic implements Topic {
@Override
public void publishMessage(ByteBuf data, PublishContext callback) {
- AtomicInteger msgDeliveryCount = new AtomicInteger(2);
+ callback.completed(null, 0L, 0L);
ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
- // retain data for sub/replication because io-thread will release
actual payload
- data.retain(2);
- this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
- subscriptions.forEach((name, subscription) -> {
- ByteBuf duplicateBuffer = data.retainedDuplicate();
- Entry entry = create(0L, 0L, duplicateBuffer);
- // entry internally retains data so, duplicateBuffer should be
release here
- duplicateBuffer.release();
- if (subscription.getDispatcher() != null) {
-
subscription.getDispatcher().sendMessages(Lists.newArrayList(entry));
- } else {
- // it happens when subscription is created but dispatcher
is not created as consumer is not added
- // yet
- entry.release();
- }
- });
- data.release();
- if (msgDeliveryCount.decrementAndGet() == 0) {
- callback.completed(null, 0L, 0L);
+ subscriptions.forEach((name, subscription) -> {
+ ByteBuf duplicateBuffer = data.retainedDuplicate();
+ Entry entry = create(0L, 0L, duplicateBuffer);
+ // entry internally retains data so, duplicateBuffer should be
release here
+ duplicateBuffer.release();
+ if (subscription.getDispatcher() != null) {
+
subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
+ } else {
+ // it happens when subscription is created but dispatcher is
not created as consumer is not added
+ // yet
+ entry.release();
}
- }));
+ });
- this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
+ if (!replicators.isEmpty()) {
replicators.forEach((name, replicator) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
@@ -221,11 +213,7 @@ public class NonPersistentTopic implements Topic {
duplicateBuffer.release();
((NonPersistentReplicator) replicator).sendMessage(entry);
});
- data.release();
- if (msgDeliveryCount.decrementAndGet() == 0) {
- callback.completed(null, 0L, 0L);
- }
- }));
+ }
}
@Override