This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new e91ad2e  [Java Client] Fix producer data race to get cnx (#13176)
e91ad2e is described below

commit e91ad2e760336f3525068f4e8773c70d24d4a13b
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Dec 7 15:34:42 2021 -0600

    [Java Client] Fix producer data race to get cnx (#13176)
    
    (cherry picked from commit 42469de02172324f974947c4b5186e897d430e09)
---
 .../pulsar/client/impl/PulsarTestClient.java       |  6 ++--
 .../apache/pulsar/client/impl/ProducerImpl.java    | 32 +++++++++++++---------
 2 files changed, 22 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index eebcf5b..8136cf0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -151,11 +151,11 @@ public class PulsarTestClient extends PulsarClientImpl {
             }
 
             @Override
-            protected boolean shouldWriteOpSendMsg() {
+            protected ClientCnx getCnxIfReady() {
                 if (dropOpSendMessages) {
-                    return false;
+                    return null;
                 } else {
-                    return super.shouldWriteOpSendMsg();
+                    return super.getCnxIfReady();
                 }
             }
         };
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b8cc821..ccb622f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -904,7 +904,22 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     @Override
     public boolean isConnected() {
-        return connectionHandler.cnx() != null && (getState() == State.Ready);
+        return getCnxIfReady() != null;
+    }
+
+    /**
+     * Hook method for testing. By returning null, it's possible to prevent 
messages
+     * being delivered to the broker.
+     *
+     * @return cnx if OpSend messages should be written to open connection. 
Caller must
+     * verify that the returned cnx is not null before using reference.
+     */
+    protected ClientCnx getCnxIfReady() {
+        if (getState() == State.Ready) {
+            return connectionHandler.cnx();
+        } else {
+            return null;
+        }
     }
 
     @Override
@@ -1793,8 +1808,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
                         last -> Math.max(last, getHighestSequenceId(op)));
             }
-            if (shouldWriteOpSendMsg()) {
-                ClientCnx cnx = cnx();
+
+            final ClientCnx cnx = getCnxIfReady();
+            if (cnx != null) {
                 if (op.msg != null && op.msg.getSchemaState() == None) {
                     tryRegisterSchema(cnx, op.msg, op.callback, 
this.connectionHandler.getEpoch());
                     return;
@@ -1817,16 +1833,6 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
     }
 
-    /**
-     * Hook method for testing. By returning false, it's possible to prevent 
messages
-     * being delivered to the broker.
-     *
-     * @return true if OpSend messages should be written to open connection
-     */
-    protected boolean shouldWriteOpSendMsg() {
-        return isConnected();
-    }
-
     // Must acquire a lock on ProducerImpl.this before calling method.
     private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, 
long expectedEpoch) {
         if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == 
null) {

Reply via email to