This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new e91ad2e [Java Client] Fix producer data race to get cnx (#13176)
e91ad2e is described below
commit e91ad2e760336f3525068f4e8773c70d24d4a13b
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Dec 7 15:34:42 2021 -0600
[Java Client] Fix producer data race to get cnx (#13176)
(cherry picked from commit 42469de02172324f974947c4b5186e897d430e09)
---
.../pulsar/client/impl/PulsarTestClient.java | 6 ++--
.../apache/pulsar/client/impl/ProducerImpl.java | 32 +++++++++++++---------
2 files changed, 22 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index eebcf5b..8136cf0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -151,11 +151,11 @@ public class PulsarTestClient extends PulsarClientImpl {
}
@Override
- protected boolean shouldWriteOpSendMsg() {
+ protected ClientCnx getCnxIfReady() {
if (dropOpSendMessages) {
- return false;
+ return null;
} else {
- return super.shouldWriteOpSendMsg();
+ return super.getCnxIfReady();
}
}
};
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b8cc821..ccb622f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -904,7 +904,22 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
@Override
public boolean isConnected() {
- return connectionHandler.cnx() != null && (getState() == State.Ready);
+ return getCnxIfReady() != null;
+ }
+
+ /**
+ * Hook method for testing. By returning null, it's possible to prevent
messages
+ * being delivered to the broker.
+ *
+ * @return cnx if OpSend messages should be written to open connection.
Caller must
+ * verify that the returned cnx is not null before using reference.
+ */
+ protected ClientCnx getCnxIfReady() {
+ if (getState() == State.Ready) {
+ return connectionHandler.cnx();
+ } else {
+ return null;
+ }
}
@Override
@@ -1793,8 +1808,9 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
last -> Math.max(last, getHighestSequenceId(op)));
}
- if (shouldWriteOpSendMsg()) {
- ClientCnx cnx = cnx();
+
+ final ClientCnx cnx = getCnxIfReady();
+ if (cnx != null) {
if (op.msg != null && op.msg.getSchemaState() == None) {
tryRegisterSchema(cnx, op.msg, op.callback,
this.connectionHandler.getEpoch());
return;
@@ -1817,16 +1833,6 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
- /**
- * Hook method for testing. By returning false, it's possible to prevent
messages
- * being delivered to the broker.
- *
- * @return true if OpSend messages should be written to open connection
- */
- protected boolean shouldWriteOpSendMsg() {
- return isConnected();
- }
-
// Must acquire a lock on ProducerImpl.this before calling method.
private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from,
long expectedEpoch) {
if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() ==
null) {