This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 26b34d405099f23be04cd22549b089c0262512fa Author: lipenghui <[email protected]> AuthorDate: Fri Feb 14 16:05:58 2020 +0800 Should flush the last potential duplicated since can't combine potential duplicated messages and non-duplicated messages into a batch. (#6326) Fixes #6273 Motivation The main reason for #6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch. --- .../pulsar/client/api/ClientDeduplicationTest.java | 12 ++++++----- .../client/impl/BatchMessageContainerImpl.java | 3 ++- .../apache/pulsar/client/impl/ProducerImpl.java | 24 ++++++++++++++-------- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java index 97e7869..09bfb79 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java @@ -182,11 +182,12 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { producer.flush(); // Repeat the messages and verify they're not received by consumer - producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync(); - producer.newMessage().value("my-message-2".getBytes()).sequenceId(4).sendAsync(); - producer.close(); + producer.newMessage().value("my-message-0".getBytes()).sequenceId(2).sendAsync(); + producer.newMessage().value("my-message-1".getBytes()).sequenceId(4).sendAsync(); + producer.newMessage().value("my-message-3".getBytes()).sequenceId(6).sendAsync(); + producer.flush(); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 4; i++) { Message<byte[]> msg = consumer.receive(); assertEquals(new String(msg.getData()), "my-message-" + i); consumer.acknowledge(msg); @@ -196,11 +197,12 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS); assertNull(msg); + producer.close(); // Kill and restart broker restartBroker(); producer = producerBuilder.create(); - assertEquals(producer.getLastSequenceId(), 5L); + assertEquals(producer.getLastSequenceId(), 6L); // Repeat the messages and verify they're not received by consumer producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index d4772ce..bcbd1c3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -69,6 +69,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { if (++numMessagesInBatch == 1) { // some properties are common amongst the different messages in the batch, hence we just pick it up from // the first message + messageMetadata.setSequenceId(msg.getSequenceId()); lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); this.firstCallback = callback; batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT @@ -87,7 +88,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { messageMetadata.setSequenceId(lowestSequenceId); } highestSequenceId = msg.getSequenceId(); - producer.lastSequenceIdPushed = msg.getSequenceId(); + producer.lastSequenceIdPushed = Math.max(producer.lastSequenceIdPushed, msg.getSequenceId()); return isBatchFull(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 7805fa8..776300d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -70,7 +70,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; @@ -114,6 +113,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private volatile long lastSequenceIdPublished; protected volatile long lastSequenceIdPushed; + private volatile boolean isLastSequenceIdPotentialDuplicated; private MessageCrypto msgCrypto = null; @@ -397,6 +397,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // should trigger complete the batch message, new message will add to a new batch and new batch // sequence id use the new message, so that broker can handle the message duplication if (sequenceId <= lastSequenceIdPushed) { + isLastSequenceIdPotentialDuplicated = true; if (sequenceId <= lastSequenceIdPublished) { log.warn("Message with sequence id {} is definitely a duplicate", sequenceId); } else { @@ -405,14 +406,21 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } doBatchSendAndAdd(msg, callback, payload); } else { - // handle boundary cases where message being added would exceed - // batch size and/or max message size - boolean isBatchFull = batchMessageContainer.add(msg, callback); - lastSendFuture = callback.getFuture(); - payload.release(); - if (isBatchFull) { - batchMessageAndSend(); + // Should flush the last potential duplicated since can't combine potential duplicated messages + // and non-duplicated messages into a batch. + if (isLastSequenceIdPotentialDuplicated) { + doBatchSendAndAdd(msg, callback, payload); + } else { + // handle boundary cases where message being added would exceed + // batch size and/or max message size + boolean isBatchFull = batchMessageContainer.add(msg, callback); + lastSendFuture = callback.getFuture(); + payload.release(); + if (isBatchFull) { + batchMessageAndSend(); + } } + isLastSequenceIdPotentialDuplicated = false; } } else { doBatchSendAndAdd(msg, callback, payload);
