This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 26b34d405099f23be04cd22549b089c0262512fa
Author: lipenghui <[email protected]>
AuthorDate: Fri Feb 14 16:05:58 2020 +0800

    Should flush the last potential duplicated since can't combine potential 
duplicated messages and non-duplicated messages into a batch. (#6326)
    
    Fixes #6273
    
    Motivation
    The main reason for #6273 is combining potential duplicated messages and 
non-duplicated messages into a batch. So need to flush the potential duplicated 
message first and then add the non-duplicated messages to a batch.
---
 .../pulsar/client/api/ClientDeduplicationTest.java | 12 ++++++-----
 .../client/impl/BatchMessageContainerImpl.java     |  3 ++-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 24 ++++++++++++++--------
 3 files changed, 25 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 97e7869..09bfb79 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -182,11 +182,12 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         producer.flush();
 
         // Repeat the messages and verify they're not received by consumer
-        
producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync();
-        
producer.newMessage().value("my-message-2".getBytes()).sequenceId(4).sendAsync();
-        producer.close();
+        
producer.newMessage().value("my-message-0".getBytes()).sequenceId(2).sendAsync();
+        
producer.newMessage().value("my-message-1".getBytes()).sequenceId(4).sendAsync();
+        
producer.newMessage().value("my-message-3".getBytes()).sequenceId(6).sendAsync();
+        producer.flush();
 
-        for (int i = 0; i < 3; i++) {
+        for (int i = 0; i < 4; i++) {
             Message<byte[]> msg = consumer.receive();
             assertEquals(new String(msg.getData()), "my-message-" + i);
             consumer.acknowledge(msg);
@@ -196,11 +197,12 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
         assertNull(msg);
 
+        producer.close();
         // Kill and restart broker
         restartBroker();
 
         producer = producerBuilder.create();
-        assertEquals(producer.getLastSequenceId(), 5L);
+        assertEquals(producer.getLastSequenceId(), 6L);
 
         // Repeat the messages and verify they're not received by consumer
         
producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index d4772ce..bcbd1c3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -69,6 +69,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         if (++numMessagesInBatch == 1) {
             // some properties are common amongst the different messages in 
the batch, hence we just pick it up from
             // the first message
+            messageMetadata.setSequenceId(msg.getSequenceId());
             lowestSequenceId = 
Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
             this.firstCallback = callback;
             batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
@@ -87,7 +88,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
             messageMetadata.setSequenceId(lowestSequenceId);
         }
         highestSequenceId = msg.getSequenceId();
-        producer.lastSequenceIdPushed = msg.getSequenceId();
+        producer.lastSequenceIdPushed = 
Math.max(producer.lastSequenceIdPushed, msg.getSequenceId());
 
         return isBatchFull();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 7805fa8..776300d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -70,7 +70,6 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
@@ -114,6 +113,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     private volatile long lastSequenceIdPublished;
     protected volatile long lastSequenceIdPushed;
+    private volatile boolean isLastSequenceIdPotentialDuplicated;
 
     private MessageCrypto msgCrypto = null;
 
@@ -397,6 +397,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                         // should trigger complete the batch message, new 
message will add to a new batch and new batch
                         // sequence id use the new message, so that broker can 
handle the message duplication
                         if (sequenceId <= lastSequenceIdPushed) {
+                            isLastSequenceIdPotentialDuplicated = true;
                             if (sequenceId <= lastSequenceIdPublished) {
                                 log.warn("Message with sequence id {} is 
definitely a duplicate", sequenceId);
                             } else {
@@ -405,14 +406,21 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             }
                             doBatchSendAndAdd(msg, callback, payload);
                         } else {
-                            // handle boundary cases where message being added 
would exceed
-                            // batch size and/or max message size
-                            boolean isBatchFull = 
batchMessageContainer.add(msg, callback);
-                            lastSendFuture = callback.getFuture();
-                            payload.release();
-                            if (isBatchFull) {
-                                batchMessageAndSend();
+                            // Should flush the last potential duplicated 
since can't combine potential duplicated messages
+                            // and non-duplicated messages into a batch.
+                            if (isLastSequenceIdPotentialDuplicated) {
+                                doBatchSendAndAdd(msg, callback, payload);
+                            } else {
+                                // handle boundary cases where message being 
added would exceed
+                                // batch size and/or max message size
+                                boolean isBatchFull = 
batchMessageContainer.add(msg, callback);
+                                lastSendFuture = callback.getFuture();
+                                payload.release();
+                                if (isBatchFull) {
+                                    batchMessageAndSend();
+                                }
                             }
+                            isLastSequenceIdPotentialDuplicated = false;
                         }
                     } else {
                         doBatchSendAndAdd(msg, callback, payload);

Reply via email to