This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb0f6b786d [fix][broker] Copy command fields and fix potential
thread-safety in ServerCnx (#19517)
0bb0f6b786d is described below
commit 0bb0f6b786d115a7405867b701521cd4a49340c5
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Feb 23 13:54:27 2023 +0100
[fix][broker] Copy command fields and fix potential thread-safety in
ServerCnx (#19517)
---
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index adee29c5a01..242947f6a0f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2438,10 +2438,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
final TransactionCoordinatorID tcId =
TransactionCoordinatorID.get(command.getTxnidMostBits());
final long requestId = command.getRequestId();
+ final List<String> partitionsList = command.getPartitionsList();
if (log.isDebugEnabled()) {
- command.getPartitionsList().forEach(partion ->
+ partitionsList.forEach(partition ->
log.debug("Receive add published partition to txn request
{} "
- + "from {} with txnId {}, topic: [{}]", requestId,
remoteAddress, txnID, partion));
+ + "from {} with txnId {}, topic: [{}]", requestId,
remoteAddress, txnID, partition));
}
if (!checkTransactionEnableAndSendError(requestId)) {
@@ -2456,7 +2457,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return failedFutureTxnNotOwned(txnID);
}
return transactionMetadataStoreService
- .addProducedPartitionToTxn(txnID,
command.getPartitionsList());
+ .addProducedPartitionToTxn(txnID, partitionsList);
})
.whenComplete((v, ex) -> {
if (ex == null) {