This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 83e3157  Issue 1455: MessageID has always batch index 0 when sending 
messages in a batch (#2099)
83e3157 is described below

commit 83e31579f5df941ad49fb54ec269c1d917821504
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Fri Jul 6 08:37:07 2018 -0700

    Issue 1455: MessageID has always batch index 0 when sending messages in a 
batch (#2099)
    
    *Motivation*
    
    Fixes #1455.
    
    Pulsar uses a callback chain for completing the list of callbacks for a 
batch. However the callback chain doesn't reference the message instance for 
completing the callback.
    so when callback chain is triggered, it always uses the first message id to 
complete the chain of callbacks.
    
    *Changes*
    
    Introduce a field to keep message instance in the callback chain. So when 
the chain is invoked, each callback can use the right message instance to 
complete the callback.
    
    Added an integration test to ensure it works correctly.
---
 .../nonpersistent/NonPersistentReplicator.java     |  7 ++-
 .../service/persistent/PersistentReplicator.java   |  7 ++-
 .../pulsar/client/impl/BatchMessageContainer.java  |  2 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 14 ++++-
 .../apache/pulsar/client/impl/SendCallback.java    | 14 ++++-
 .../tests/integration/semantics/SemanticsTest.java | 64 ++++++++++++++++++++++
 6 files changed, 99 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 42b99ad..d505c44 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -212,7 +212,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
         };
 
         @Override
-        public void addCallback(SendCallback scb) {
+        public void addCallback(MessageImpl<?> msg, SendCallback scb) {
             // noop
         }
 
@@ -222,6 +222,11 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
         }
 
         @Override
+        public MessageImpl<?> getNextMessage() {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<MessageId> getFuture() {
             return null;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index e0df5ea..7d529f9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -381,7 +381,7 @@ public class PersistentReplicator extends 
AbstractReplicator implements Replicat
         };
 
         @Override
-        public void addCallback(SendCallback scb) {
+        public void addCallback(MessageImpl<?> msg, SendCallback scb) {
             // noop
         }
 
@@ -391,6 +391,11 @@ public class PersistentReplicator extends 
AbstractReplicator implements Replicat
         }
 
         @Override
+        public MessageImpl<?> getNextMessage() {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<MessageId> getFuture() {
             return null;
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
index a97e524..4d2ca09 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
@@ -96,7 +96,7 @@ class BatchMessageContainer {
         }
 
         if (previousCallback != null) {
-            previousCallback.addCallback(callback);
+            previousCallback.addCallback(msg, callback);
         }
         previousCallback = callback;
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index afb6177..a621a0e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -207,6 +207,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
         sendAsync(message, new SendCallback() {
             SendCallback nextCallback = null;
+            MessageImpl<?> nextMsg = null;
             long createdAt = System.nanoTime();
 
             @Override
@@ -220,6 +221,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             }
 
             @Override
+            public MessageImpl<?> getNextMessage() {
+                return nextMsg;
+            }
+
+            @Override
             public void sendComplete(Exception e) {
                 if (e != null) {
                     stats.incrementSendFailed();
@@ -230,20 +236,22 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 }
                 while (nextCallback != null) {
                     SendCallback sendCallback = nextCallback;
+                    MessageImpl<?> msg = nextMsg;
                     if (e != null) {
                         stats.incrementSendFailed();
                         sendCallback.getFuture().completeExceptionally(e);
                     } else {
-                        
sendCallback.getFuture().complete(message.getMessageId());
+                        sendCallback.getFuture().complete(msg.getMessageId());
                         stats.incrementNumAcksReceived(System.nanoTime() - 
createdAt);
                     }
+                    nextMsg = nextCallback.getNextMessage();
                     nextCallback = nextCallback.getNextSendCallback();
-                    sendCallback = null;
                 }
             }
 
             @Override
-            public void addCallback(SendCallback scb) {
+            public void addCallback(MessageImpl<?> msg, SendCallback scb) {
+                nextMsg = msg;
                 nextCallback = scb;
             }
         });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
index e773315..ac8ff4a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
@@ -37,10 +37,11 @@ public interface SendCallback {
     /**
      * used to specify a callback to be invoked on completion of a send 
operation for individual messages sent in a
      * batch. Callbacks for messages in a batch get chained
-     * 
-     * @param scb
+     *
+     * @param msg message sent
+     * @param scb callback associated with the message
      */
-    void addCallback(SendCallback scb);
+    void addCallback(MessageImpl<?> msg, SendCallback scb);
 
     /**
      *
@@ -49,6 +50,13 @@ public interface SendCallback {
     SendCallback getNextSendCallback();
 
     /**
+     * Return next message in chain
+     *
+     * @return next message in chain
+     */
+    MessageImpl<?> getNextMessage();
+
+    /**
      *
      * @return future associated with callback
      */
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index 1ba6533..701b38e 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -19,18 +19,25 @@
 package org.apache.pulsar.tests.integration.semantics;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 import org.testng.annotations.Test;
+import org.testng.collections.Lists;
 
 /**
  * Test pulsar produce/consume semantics
@@ -184,4 +191,61 @@ public class SemanticsTest extends PulsarClusterTestBase {
         receiveAndAssertMessage(consumer, 1L, "message-1");
         receiveAndAssertMessage(consumer, 2L, "message-2");
     }
+
+    @Test(dataProvider = "ServiceUrls")
+    public void testBatchProducing(String serviceUrl) throws Exception {
+        String topicName = generateTopicName("testbatchproducing", true);
+
+        int numMessages = 10;
+
+        List<MessageId> producedMsgIds;
+
+        try (PulsarClient client = PulsarClient.builder()
+            .serviceUrl(serviceUrl)
+            .build()) {
+
+            try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscribe()) {
+
+                try (Producer<String> producer = 
client.newProducer(Schema.STRING)
+                    .topic(topicName)
+                    .enableBatching(true)
+                    .batchingMaxMessages(5)
+                    .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                    .create()) {
+
+                    List<CompletableFuture<MessageId>> sendFutures = 
Lists.newArrayList();
+                    for (int i = 0; i < numMessages; i++) {
+                        sendFutures.add(producer.sendAsync("batch-message-" + 
i));
+                    }
+                    CompletableFuture.allOf(sendFutures.toArray(new 
CompletableFuture[numMessages])).get();
+                    producedMsgIds = sendFutures.stream().map(future -> 
future.join()).collect(Collectors.toList());
+                }
+
+                for (int i = 0; i < numMessages; i++) {
+                    Message<String> m = consumer.receive();
+                    assertEquals(producedMsgIds.get(i), m.getMessageId());
+                    assertEquals("batch-message-" + i, m.getValue());
+                }
+            }
+        }
+
+        // inspect the message ids
+        for (int i = 0; i < 5; i++) {
+            assertTrue(producedMsgIds.get(i) instanceof BatchMessageIdImpl);
+            BatchMessageIdImpl mid = (BatchMessageIdImpl) 
producedMsgIds.get(i);
+            log.info("Message {} id : {}", i, mid);
+
+            assertEquals(i, mid.getBatchIndex());
+        }
+        for (int i = 5; i < 10; i++) {
+            assertTrue(producedMsgIds.get(i) instanceof BatchMessageIdImpl);
+            BatchMessageIdImpl mid = (BatchMessageIdImpl) 
producedMsgIds.get(i);
+            log.info("Message {} id : {}", i, mid);
+
+            assertEquals(i - 5, mid.getBatchIndex());
+        }
+    }
 }

Reply via email to