This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0717c8b65ab [fix] [cli] the variable producerName of BatchMsgContainer
is null (#20819)
0717c8b65ab is described below
commit 0717c8b65ab4f775151f83acb0a41b227be951b7
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 18 11:22:41 2023 +0800
[fix] [cli] the variable producerName of BatchMsgContainer is null (#20819)
Motivation: If the producer name is generated by the Broker, the producer
will update the variable `producerName` after connecting, but not update the
same variable of the batch message container.
Modifications: fix bug
(cherry picked from commit aba50f2a276412bc43a4652b9ce303d384f71966)
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 5 +++--
.../client/impl/AbstractBatchMessageContainer.java | 2 --
.../pulsar/client/impl/BatchMessageContainerImpl.java | 16 ++++++++++++----
.../client/impl/BatchMessageKeyBasedContainer.java | 4 ++--
4 files changed, 17 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 130d312c2a2..1bc7accc0ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1741,11 +1741,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
headersAndPayload.resetReaderIndex();
if (log.isDebugEnabled()) {
log.debug("[{}] Received send message request. producer: {}:{}
{}:{} size: {},"
- + " partition key is: {}, ordering key is {}",
+ + " partition key is: {}, ordering key is {},
uncompressedSize is {}",
remoteAddress, send.getProducerId(), send.getSequenceId(),
msgMetadata.getProducerName(),
msgMetadata.getSequenceId(),
headersAndPayload.readableBytes(),
msgMetadata.hasPartitionKey() ?
msgMetadata.getPartitionKey() : null,
- msgMetadata.hasOrderingKey() ?
msgMetadata.getOrderingKey() : null);
+ msgMetadata.hasOrderingKey() ?
msgMetadata.getOrderingKey() : null,
+ msgMetadata.getUncompressedSize());
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 1827142cdfa..e81365d3886 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -35,7 +35,6 @@ public abstract class AbstractBatchMessageContainer
implements BatchMessageConta
protected CompressionType compressionType;
protected CompressionCodec compressor;
protected String topicName;
- protected String producerName;
protected ProducerImpl producer;
protected int maxNumMessagesInBatch;
@@ -108,7 +107,6 @@ public abstract class AbstractBatchMessageContainer
implements BatchMessageConta
public void setProducer(ProducerImpl<?> producer) {
this.producer = producer;
this.topicName = producer.getTopic();
- this.producerName = producer.getProducerName();
this.compressionType = CompressionCodecProvider
.convertToWireProtocol(producer.getConfiguration().getCompressionType());
this.compressor =
CompressionCodecProvider.getCompressionCodec(compressionType);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index fdbf1f15c29..9be7210a387 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -89,8 +89,8 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] add message to batch, num messages in batch
so far {}", topicName, producerName,
- numMessagesInBatch);
+ log.debug("[{}] [{}] add message to batch, num messages in batch
so far {}", topicName,
+ producer.getProducerName(), numMessagesInBatch);
}
if (++numMessagesInBatch == 1) {
@@ -234,8 +234,8 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
batchedMessageMetadataAndPayload = null;
}
} catch (Throwable t) {
- log.warn("[{}] [{}] Got exception while completing the callback
for msg {}:", topicName, producerName,
- lowestSequenceId, t);
+ log.warn("[{}] [{}] Got exception while completing the callback
for msg {}:", topicName,
+ producer.getProducerName(), lowestSequenceId, t);
}
clear();
}
@@ -304,6 +304,14 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
ByteBufPair cmd = producer.sendMessage(producer.producerId,
messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), numMessagesInBatch,
messageMetadata, encryptedPayload);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Build batch msg seq:{}, highest-seq:{},
numMessagesInBatch: {}, uncompressedSize: {},"
+ + " payloadSize: {}", topicName,
producer.getProducerName(),
+ messageMetadata.getSequenceId(),
messageMetadata.getNumMessagesInBatch(),
+ messageMetadata.getHighestSequenceId(),
+ messageMetadata.getUncompressedSize(),
encryptedPayload.readableBytes());
+ }
+
OpSendMsg op = OpSendMsg.create(messages, cmd,
messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback,
batchAllocatedSizeBytes);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index 45d683e72b0..e2ce9e2d0bd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -43,8 +43,8 @@ class BatchMessageKeyBasedContainer extends
AbstractBatchMessageContainer {
@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] add message to batch, num messages in batch
so far is {}", topicName, producerName,
- numMessagesInBatch);
+ log.debug("[{}] [{}] add message to batch, num messages in batch
so far is {}", topicName,
+ producer.getProducerName(), numMessagesInBatch);
}
String key = getKey(msg);
final BatchMessageContainerImpl batchMessageContainer =
batches.computeIfAbsent(key,