This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 05631bdb876 [fix][broker] Copy proto command fields into final 
variables in ServerCnx (#18987)
05631bdb876 is described below

commit 05631bdb876defeec2707da1b2ee76d91fc8210c
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Dec 19 23:59:35 2022 -0600

    [fix][broker] Copy proto command fields into final variables in ServerCnx 
(#18987)
    
    In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite 
it for each incoming protocol message. As a result, it is not safe to publish 
any references to a proto command to other threads.
    
    Here is the single `BaseCommand`:
    
    
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99
    
    Here is the method call that resets the object:
    
    
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114
    
    Note that the call to `parseFrom` first calls `clear()`, which resets all 
values on the object.
    
    This PR copies relevant values or objects into other variables.
    
    * Replace `command` with `tcId` since the latter is a final variable meant 
to be published to another thread.
    * Move logic to copy certain command fields to earlier in method for 
`handleSubscribe`
    * Copy `ack` object to new `CommandAck` when there is a broker interceptor. 
Note that copying this command is likely somewhat costly, so we only do it when 
there is an interceptor configured.
    
    This is a trivial change that is already covered by tests.
    
    - [x] `doc-not-needed`
    
    This is an internal change.
    
    PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/8
    
    (cherry picked from commit a408e9e392d48dcda7c17cd9b9e85e530c94998d)
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7d10a43cfe2..cfbbef1ea0b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1475,6 +1475,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final boolean hasRequestId = ack.hasRequestId();
         final long requestId = hasRequestId ? ack.getRequestId() : 0;
         final long consumerId = ack.getConsumerId();
+        final CommandAck finalAck = getBrokerService().getInterceptor() != 
null ? new CommandAck().copyFrom(ack) : null;
 
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
@@ -1484,7 +1485,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     requestId, null, null, consumerId));
                         }
                         if (getBrokerService().getInterceptor() != null) {
-                            
getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
+                            
getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
                         }
                     }).exceptionally(e -> {
                         if (hasRequestId) {

Reply via email to