This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 83e3157 Issue 1455: MessageID has always batch index 0 when sending messages in a batch (#2099) 83e3157 is described below commit 83e31579f5df941ad49fb54ec269c1d917821504 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Fri Jul 6 08:37:07 2018 -0700 Issue 1455: MessageID has always batch index 0 when sending messages in a batch (#2099) *Motivation* Fixes #1455. Pulsar uses a callback chain for completing the list of callbacks for a batch. However the callback chain doesn't reference the message instance for completing the callback. so when callback chain is triggered, it always uses the first message id to complete the chain of callbacks. *Changes* Introduce a field to keep message instance in the callback chain. So when the chain is invoked, each callback can use the right message instance to complete the callback. Added an integration test to ensure it works correctly. --- .../nonpersistent/NonPersistentReplicator.java | 7 ++- .../service/persistent/PersistentReplicator.java | 7 ++- .../pulsar/client/impl/BatchMessageContainer.java | 2 +- .../apache/pulsar/client/impl/ProducerImpl.java | 14 ++++- .../apache/pulsar/client/impl/SendCallback.java | 14 ++++- .../tests/integration/semantics/SemanticsTest.java | 64 ++++++++++++++++++++++ 6 files changed, 99 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 42b99ad..d505c44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -212,7 +212,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli }; @Override - public void addCallback(SendCallback scb) { + public void addCallback(MessageImpl<?> msg, SendCallback scb) { // noop } @@ -222,6 +222,11 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli } @Override + public MessageImpl<?> getNextMessage() { + return null; + } + + @Override public CompletableFuture<MessageId> getFuture() { return null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index e0df5ea..7d529f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -381,7 +381,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat }; @Override - public void addCallback(SendCallback scb) { + public void addCallback(MessageImpl<?> msg, SendCallback scb) { // noop } @@ -391,6 +391,11 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat } @Override + public MessageImpl<?> getNextMessage() { + return null; + } + + @Override public CompletableFuture<MessageId> getFuture() { return null; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java index a97e524..4d2ca09 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java @@ -96,7 +96,7 @@ class BatchMessageContainer { } if (previousCallback != null) { - previousCallback.addCallback(callback); + previousCallback.addCallback(msg, callback); } previousCallback = callback; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index afb6177..a621a0e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -207,6 +207,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne sendAsync(message, new SendCallback() { SendCallback nextCallback = null; + MessageImpl<?> nextMsg = null; long createdAt = System.nanoTime(); @Override @@ -220,6 +221,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } @Override + public MessageImpl<?> getNextMessage() { + return nextMsg; + } + + @Override public void sendComplete(Exception e) { if (e != null) { stats.incrementSendFailed(); @@ -230,20 +236,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } while (nextCallback != null) { SendCallback sendCallback = nextCallback; + MessageImpl<?> msg = nextMsg; if (e != null) { stats.incrementSendFailed(); sendCallback.getFuture().completeExceptionally(e); } else { - sendCallback.getFuture().complete(message.getMessageId()); + sendCallback.getFuture().complete(msg.getMessageId()); stats.incrementNumAcksReceived(System.nanoTime() - createdAt); } + nextMsg = nextCallback.getNextMessage(); nextCallback = nextCallback.getNextSendCallback(); - sendCallback = null; } } @Override - public void addCallback(SendCallback scb) { + public void addCallback(MessageImpl<?> msg, SendCallback scb) { + nextMsg = msg; nextCallback = scb; } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java index e773315..ac8ff4a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java @@ -37,10 +37,11 @@ public interface SendCallback { /** * used to specify a callback to be invoked on completion of a send operation for individual messages sent in a * batch. Callbacks for messages in a batch get chained - * - * @param scb + * + * @param msg message sent + * @param scb callback associated with the message */ - void addCallback(SendCallback scb); + void addCallback(MessageImpl<?> msg, SendCallback scb); /** * @@ -49,6 +50,13 @@ public interface SendCallback { SendCallback getNextSendCallback(); /** + * Return next message in chain + * + * @return next message in chain + */ + MessageImpl<?> getNextMessage(); + + /** * * @return future associated with callback */ diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java index 1ba6533..701b38e 100644 --- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java +++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java @@ -19,18 +19,25 @@ package org.apache.pulsar.tests.integration.semantics; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.tests.topologies.PulsarClusterTestBase; import org.testng.annotations.Test; +import org.testng.collections.Lists; /** * Test pulsar produce/consume semantics @@ -184,4 +191,61 @@ public class SemanticsTest extends PulsarClusterTestBase { receiveAndAssertMessage(consumer, 1L, "message-1"); receiveAndAssertMessage(consumer, 2L, "message-2"); } + + @Test(dataProvider = "ServiceUrls") + public void testBatchProducing(String serviceUrl) throws Exception { + String topicName = generateTopicName("testbatchproducing", true); + + int numMessages = 10; + + List<MessageId> producedMsgIds; + + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(serviceUrl) + .build()) { + + try (Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("my-sub") + .subscribe()) { + + try (Producer<String> producer = client.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(true) + .batchingMaxMessages(5) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .create()) { + + List<CompletableFuture<MessageId>> sendFutures = Lists.newArrayList(); + for (int i = 0; i < numMessages; i++) { + sendFutures.add(producer.sendAsync("batch-message-" + i)); + } + CompletableFuture.allOf(sendFutures.toArray(new CompletableFuture[numMessages])).get(); + producedMsgIds = sendFutures.stream().map(future -> future.join()).collect(Collectors.toList()); + } + + for (int i = 0; i < numMessages; i++) { + Message<String> m = consumer.receive(); + assertEquals(producedMsgIds.get(i), m.getMessageId()); + assertEquals("batch-message-" + i, m.getValue()); + } + } + } + + // inspect the message ids + for (int i = 0; i < 5; i++) { + assertTrue(producedMsgIds.get(i) instanceof BatchMessageIdImpl); + BatchMessageIdImpl mid = (BatchMessageIdImpl) producedMsgIds.get(i); + log.info("Message {} id : {}", i, mid); + + assertEquals(i, mid.getBatchIndex()); + } + for (int i = 5; i < 10; i++) { + assertTrue(producedMsgIds.get(i) instanceof BatchMessageIdImpl); + BatchMessageIdImpl mid = (BatchMessageIdImpl) producedMsgIds.get(i); + log.info("Message {} id : {}", i, mid); + + assertEquals(i - 5, mid.getBatchIndex()); + } + } }