This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 05631bdb876 [fix][broker] Copy proto command fields into final
variables in ServerCnx (#18987)
05631bdb876 is described below
commit 05631bdb876defeec2707da1b2ee76d91fc8210c
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Dec 19 23:59:35 2022 -0600
[fix][broker] Copy proto command fields into final variables in ServerCnx
(#18987)
In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite
it for each incoming protocol message. As a result, it is not safe to publish
any references to a proto command to other threads.
Here is the single `BaseCommand`:
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99
Here is the method call that resets the object:
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114
Note that the call to `parseFrom` first calls `clear()`, which resets all
values on the object.
This PR copies relevant values or objects into other variables.
* Replace `command` with `tcId` since the latter is a final variable meant
to be published to another thread.
* Move logic to copy certain command fields to earlier in method for
`handleSubscribe`
* Copy `ack` object to new `CommandAck` when there is a broker interceptor.
Note that copying this command is likely somewhat costly, so we only do it when
there is an interceptor configured.
This is a trivial change that is already covered by tests.
- [x] `doc-not-needed`
This is an internal change.
PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/8
(cherry picked from commit a408e9e392d48dcda7c17cd9b9e85e530c94998d)
---
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7d10a43cfe2..cfbbef1ea0b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1475,6 +1475,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
+ final CommandAck finalAck = getBrokerService().getInterceptor() !=
null ? new CommandAck().copyFrom(ack) : null;
if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
@@ -1484,7 +1485,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
requestId, null, null, consumerId));
}
if (getBrokerService().getInterceptor() != null) {
-
getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
+
getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
}
}).exceptionally(e -> {
if (hasRequestId) {