This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 094d90e [pulsar-client] Remove UUID generation on sending message
(#7705)
094d90e is described below
commit 094d90ee22b92bd4da0f4af30b28d798f6770fee
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Aug 2 12:48:50 2020 -0700
[pulsar-client] Remove UUID generation on sending message (#7705)
* [pulsar-client] Remove UUID generation on sending message
* fix prod name
---
.../apache/pulsar/client/impl/ProducerImpl.java | 25 +++++++++++-----------
1 file changed, 13 insertions(+), 12 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5ff8c3d..b2f00b2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -47,7 +47,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
@@ -411,9 +410,16 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
try {
synchronized (this) {
int readStartIndex = 0;
- String uuid = UUID.randomUUID().toString();
+ long sequenceId;
+ if (!msgMetadataBuilder.hasSequenceId()) {
+ sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
+ msgMetadataBuilder.setSequenceId(sequenceId);
+ } else {
+ sequenceId = msgMetadataBuilder.getSequenceId();
+ }
+ String uuid = totalChunks > 1 ? String.format("%s-%d",
producerName, sequenceId) : null;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
- serializeAndSendMessage(msg, msgMetadataBuilder, payload,
uuid, chunkId, totalChunks,
+ serializeAndSendMessage(msg, msgMetadataBuilder, payload,
sequenceId, uuid, chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(),
compressedPayload,
compressedPayload.readableBytes(),
uncompressedSize, callback);
readStartIndex = ((chunkId + 1) *
ClientCnx.getMaxMessageSize());
@@ -428,7 +434,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
private void serializeAndSendMessage(MessageImpl<?> msg, Builder
msgMetadataBuilder, ByteBuf payload,
- String uuid, int chunkId, int totalChunks, int readStartIndex, int
chunkMaxSizeInBytes, ByteBuf compressedPayload,
+ long sequenceId, String uuid, int chunkId, int totalChunks, int
readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
int compressedPayloadSize,
int uncompressedSize, SendCallback callback) throws IOException,
InterruptedException {
ByteBuf chunkPayload = compressedPayload;
@@ -442,18 +448,13 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
chunkPayload.retain();
chunkMsgMetadataBuilder = msgMetadataBuilder.clone();
}
- chunkMsgMetadataBuilder.setUuid(uuid);
+ if (uuid != null) {
+ chunkMsgMetadataBuilder.setUuid(uuid);
+ }
chunkMsgMetadataBuilder.setChunkId(chunkId);
chunkMsgMetadataBuilder.setNumChunksFromMsg(totalChunks);
chunkMsgMetadataBuilder.setTotalChunkMsgSize(compressedPayloadSize);
}
- long sequenceId;
- if (!chunkMsgMetadataBuilder.hasSequenceId()) {
- sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
- chunkMsgMetadataBuilder.setSequenceId(sequenceId);
- } else {
- sequenceId = chunkMsgMetadataBuilder.getSequenceId();
- }
if (!chunkMsgMetadataBuilder.hasPublishTime()) {
chunkMsgMetadataBuilder.setPublishTime(client.getClientClock().millis());