This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 4dae6cbf60b [fix] [cli] the variable producerName of BatchMsgContainer 
is null (#20819)
4dae6cbf60b is described below

commit 4dae6cbf60b51bc19b37ec6e393db3b5a1a57661
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 18 11:22:41 2023 +0800

    [fix] [cli] the variable producerName of BatchMsgContainer is null (#20819)
    
    Motivation: If the producer name is generated by the Broker, the producer 
will update the variable `producerName` after connecting, but not update the 
same variable of the batch message container.
    
    Modifications: fix bug
    (cherry picked from commit aba50f2a276412bc43a4652b9ce303d384f71966)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java |  5 +++--
 .../client/impl/AbstractBatchMessageContainer.java       |  2 --
 .../pulsar/client/impl/BatchMessageContainerImpl.java    | 16 ++++++++++++----
 .../client/impl/BatchMessageKeyBasedContainer.java       |  4 ++--
 4 files changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e369e7d7b68..c9d8a2ff700 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1538,11 +1538,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         headersAndPayload.resetReaderIndex();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received send message request. producer: {}:{} 
{}:{} size: {},"
-                            + " partition key is: {}, ordering key is {}",
+                            + " partition key is: {}, ordering key is {}, 
uncompressedSize is {}",
                     remoteAddress, send.getProducerId(), send.getSequenceId(), 
msgMetadata.getProducerName(),
                     msgMetadata.getSequenceId(), 
headersAndPayload.readableBytes(),
                     msgMetadata.hasPartitionKey() ? 
msgMetadata.getPartitionKey() : null,
-                    msgMetadata.hasOrderingKey() ? 
msgMetadata.getOrderingKey() : null);
+                    msgMetadata.hasOrderingKey() ? 
msgMetadata.getOrderingKey() : null,
+                    msgMetadata.getUncompressedSize());
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 9b4d1b7d683..f762b5ab450 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -35,7 +35,6 @@ public abstract class AbstractBatchMessageContainer 
implements BatchMessageConta
     protected CompressionType compressionType;
     protected CompressionCodec compressor;
     protected String topicName;
-    protected String producerName;
     protected ProducerImpl producer;
 
     protected int maxNumMessagesInBatch;
@@ -98,7 +97,6 @@ public abstract class AbstractBatchMessageContainer 
implements BatchMessageConta
     public void setProducer(ProducerImpl<?> producer) {
         this.producer = producer;
         this.topicName = producer.getTopic();
-        this.producerName = producer.getProducerName();
         this.compressionType = CompressionCodecProvider
                 
.convertToWireProtocol(producer.getConfiguration().getCompressionType());
         this.compressor = 
CompressionCodecProvider.getCompressionCodec(compressionType);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 37b522d8902..f657816b6dd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -80,8 +80,8 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
     public boolean add(MessageImpl<?> msg, SendCallback callback) {
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] add message to batch, num messages in batch 
so far {}", topicName, producerName,
-                    numMessagesInBatch);
+            log.debug("[{}] [{}] add message to batch, num messages in batch 
so far {}", topicName,
+                    producer.getProducerName(), numMessagesInBatch);
         }
 
         if (++numMessagesInBatch == 1) {
@@ -193,8 +193,8 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
                 firstCallback.sendComplete(ex);
             }
         } catch (Throwable t) {
-            log.warn("[{}] [{}] Got exception while completing the callback 
for msg {}:", topicName, producerName,
-                    lowestSequenceId, t);
+            log.warn("[{}] [{}] Got exception while completing the callback 
for msg {}:", topicName,
+                    producer.getProducerName(), lowestSequenceId, t);
         }
         clear();
     }
@@ -226,6 +226,14 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         ByteBufPair cmd = producer.sendMessage(producer.producerId, 
messageMetadata.getSequenceId(),
                 messageMetadata.getHighestSequenceId(), numMessagesInBatch, 
messageMetadata, encryptedPayload);
 
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] Build batch msg seq:{}, highest-seq:{}, 
numMessagesInBatch: {}, uncompressedSize: {},"
+                            + " payloadSize: {}", topicName, 
producer.getProducerName(),
+                    messageMetadata.getSequenceId(), 
messageMetadata.getNumMessagesInBatch(),
+                    messageMetadata.getHighestSequenceId(),
+                    messageMetadata.getUncompressedSize(), 
encryptedPayload.readableBytes());
+        }
+
         OpSendMsg op = OpSendMsg.create(messages, cmd, 
messageMetadata.getSequenceId(),
                 messageMetadata.getHighestSequenceId(), firstCallback);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index 7614728353f..272f2dd54a0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -54,8 +54,8 @@ class BatchMessageKeyBasedContainer extends 
AbstractBatchMessageContainer {
     @Override
     public boolean add(MessageImpl<?> msg, SendCallback callback) {
         if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] add message to batch, num messages in batch 
so far is {}", topicName, producerName,
-                    numMessagesInBatch);
+            log.debug("[{}] [{}] add message to batch, num messages in batch 
so far is {}", topicName,
+                    producer.getProducerName(), numMessagesInBatch);
         }
         numMessagesInBatch++;
         currentBatchSizeBytes += msg.getDataBuffer().readableBytes();

Reply via email to