This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 094d90e  [pulsar-client] Remove UUID generation on sending message 
(#7705)
094d90e is described below

commit 094d90ee22b92bd4da0f4af30b28d798f6770fee
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Aug 2 12:48:50 2020 -0700

    [pulsar-client] Remove UUID generation on sending message (#7705)
    
    * [pulsar-client] Remove UUID generation on sending message
    
    * fix prod name
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 25 +++++++++++-----------
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5ff8c3d..b2f00b2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -47,7 +47,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
@@ -411,9 +410,16 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         try {
             synchronized (this) {
                 int readStartIndex = 0;
-                String uuid = UUID.randomUUID().toString();
+                long sequenceId;
+                if (!msgMetadataBuilder.hasSequenceId()) {
+                    sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
+                    msgMetadataBuilder.setSequenceId(sequenceId);
+                } else {
+                    sequenceId = msgMetadataBuilder.getSequenceId();
+                }
+                String uuid = totalChunks > 1 ? String.format("%s-%d", 
producerName, sequenceId) : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    serializeAndSendMessage(msg, msgMetadataBuilder, payload, 
uuid, chunkId, totalChunks,
+                    serializeAndSendMessage(msg, msgMetadataBuilder, payload, 
sequenceId, uuid, chunkId, totalChunks,
                             readStartIndex, ClientCnx.getMaxMessageSize(), 
compressedPayload,
                             compressedPayload.readableBytes(), 
uncompressedSize, callback);
                     readStartIndex = ((chunkId + 1) * 
ClientCnx.getMaxMessageSize());
@@ -428,7 +434,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     private void serializeAndSendMessage(MessageImpl<?> msg, Builder 
msgMetadataBuilder, ByteBuf payload,
-            String uuid, int chunkId, int totalChunks, int readStartIndex, int 
chunkMaxSizeInBytes, ByteBuf compressedPayload,
+            long sequenceId, String uuid, int chunkId, int totalChunks, int 
readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
             int compressedPayloadSize,
             int uncompressedSize, SendCallback callback) throws IOException, 
InterruptedException {
         ByteBuf chunkPayload = compressedPayload;
@@ -442,18 +448,13 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 chunkPayload.retain();
                 chunkMsgMetadataBuilder = msgMetadataBuilder.clone();
             }
-            chunkMsgMetadataBuilder.setUuid(uuid);
+            if (uuid != null) {
+                chunkMsgMetadataBuilder.setUuid(uuid);
+            }
             chunkMsgMetadataBuilder.setChunkId(chunkId);
             chunkMsgMetadataBuilder.setNumChunksFromMsg(totalChunks);
             
chunkMsgMetadataBuilder.setTotalChunkMsgSize(compressedPayloadSize);
         }
-        long sequenceId;
-        if (!chunkMsgMetadataBuilder.hasSequenceId()) {
-            sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
-            chunkMsgMetadataBuilder.setSequenceId(sequenceId);
-        } else {
-            sequenceId = chunkMsgMetadataBuilder.getSequenceId();
-        }
         if (!chunkMsgMetadataBuilder.hasPublishTime()) {
             
chunkMsgMetadataBuilder.setPublishTime(client.getClientClock().millis());
 

Reply via email to