This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0717c8b65ab [fix] [cli] the variable producerName of BatchMsgContainer 
is null (#20819)
0717c8b65ab is described below

commit 0717c8b65ab4f775151f83acb0a41b227be951b7
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 18 11:22:41 2023 +0800

    [fix] [cli] the variable producerName of BatchMsgContainer is null (#20819)
    
    Motivation: If the producer name is generated by the Broker, the producer 
will update the variable `producerName` after connecting, but not update the 
same variable of the batch message container.
    
    Modifications: fix bug
    (cherry picked from commit aba50f2a276412bc43a4652b9ce303d384f71966)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java |  5 +++--
 .../client/impl/AbstractBatchMessageContainer.java       |  2 --
 .../pulsar/client/impl/BatchMessageContainerImpl.java    | 16 ++++++++++++----
 .../client/impl/BatchMessageKeyBasedContainer.java       |  4 ++--
 4 files changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 130d312c2a2..1bc7accc0ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1741,11 +1741,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         headersAndPayload.resetReaderIndex();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received send message request. producer: {}:{} 
{}:{} size: {},"
-                            + " partition key is: {}, ordering key is {}",
+                            + " partition key is: {}, ordering key is {}, 
uncompressedSize is {}",
                     remoteAddress, send.getProducerId(), send.getSequenceId(), 
msgMetadata.getProducerName(),
                     msgMetadata.getSequenceId(), 
headersAndPayload.readableBytes(),
                     msgMetadata.hasPartitionKey() ? 
msgMetadata.getPartitionKey() : null,
-                    msgMetadata.hasOrderingKey() ? 
msgMetadata.getOrderingKey() : null);
+                    msgMetadata.hasOrderingKey() ? 
msgMetadata.getOrderingKey() : null,
+                    msgMetadata.getUncompressedSize());
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 1827142cdfa..e81365d3886 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -35,7 +35,6 @@ public abstract class AbstractBatchMessageContainer 
implements BatchMessageConta
     protected CompressionType compressionType;
     protected CompressionCodec compressor;
     protected String topicName;
-    protected String producerName;
     protected ProducerImpl producer;
 
     protected int maxNumMessagesInBatch;
@@ -108,7 +107,6 @@ public abstract class AbstractBatchMessageContainer 
implements BatchMessageConta
     public void setProducer(ProducerImpl<?> producer) {
         this.producer = producer;
         this.topicName = producer.getTopic();
-        this.producerName = producer.getProducerName();
         this.compressionType = CompressionCodecProvider
                 
.convertToWireProtocol(producer.getConfiguration().getCompressionType());
         this.compressor = 
CompressionCodecProvider.getCompressionCodec(compressionType);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index fdbf1f15c29..9be7210a387 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -89,8 +89,8 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
     public boolean add(MessageImpl<?> msg, SendCallback callback) {
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] add message to batch, num messages in batch 
so far {}", topicName, producerName,
-                    numMessagesInBatch);
+            log.debug("[{}] [{}] add message to batch, num messages in batch 
so far {}", topicName,
+                    producer.getProducerName(), numMessagesInBatch);
         }
 
         if (++numMessagesInBatch == 1) {
@@ -234,8 +234,8 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
                 batchedMessageMetadataAndPayload = null;
             }
         } catch (Throwable t) {
-            log.warn("[{}] [{}] Got exception while completing the callback 
for msg {}:", topicName, producerName,
-                    lowestSequenceId, t);
+            log.warn("[{}] [{}] Got exception while completing the callback 
for msg {}:", topicName,
+                    producer.getProducerName(), lowestSequenceId, t);
         }
         clear();
     }
@@ -304,6 +304,14 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         ByteBufPair cmd = producer.sendMessage(producer.producerId, 
messageMetadata.getSequenceId(),
                 messageMetadata.getHighestSequenceId(), numMessagesInBatch, 
messageMetadata, encryptedPayload);
 
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] Build batch msg seq:{}, highest-seq:{}, 
numMessagesInBatch: {}, uncompressedSize: {},"
+                            + " payloadSize: {}", topicName, 
producer.getProducerName(),
+                    messageMetadata.getSequenceId(), 
messageMetadata.getNumMessagesInBatch(),
+                    messageMetadata.getHighestSequenceId(),
+                    messageMetadata.getUncompressedSize(), 
encryptedPayload.readableBytes());
+        }
+
         OpSendMsg op = OpSendMsg.create(messages, cmd, 
messageMetadata.getSequenceId(),
                 messageMetadata.getHighestSequenceId(), firstCallback, 
batchAllocatedSizeBytes);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index 45d683e72b0..e2ce9e2d0bd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -43,8 +43,8 @@ class BatchMessageKeyBasedContainer extends 
AbstractBatchMessageContainer {
     @Override
     public boolean add(MessageImpl<?> msg, SendCallback callback) {
         if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] add message to batch, num messages in batch 
so far is {}", topicName, producerName,
-                    numMessagesInBatch);
+            log.debug("[{}] [{}] add message to batch, num messages in batch 
so far is {}", topicName,
+                    producer.getProducerName(), numMessagesInBatch);
         }
         String key = getKey(msg);
         final BatchMessageContainerImpl batchMessageContainer = 
batches.computeIfAbsent(key,

Reply via email to