This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new b68606a [Java Client] Fix concurrency issue in incrementing epoch
(#10278) (#10436)
b68606a is described below
commit b68606a2c38323ead2fe2524a97507b3a1654659
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Apr 29 14:50:10 2021 +0300
[Java Client] Fix concurrency issue in incrementing epoch (#10278) (#10436)
---
.../org/apache/pulsar/client/impl/ConnectionHandler.java | 14 +++++++++++---
.../java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 +-
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index a9a8f7c..8802178 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.impl;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import com.google.common.annotations.VisibleForTesting;
@@ -35,7 +37,9 @@ public class ConnectionHandler {
protected final HandlerState state;
protected final Backoff backoff;
- protected long epoch = 0L;
+ private static final AtomicLongFieldUpdater<ConnectionHandler>
EPOCH_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(ConnectionHandler.class, "epoch");
+ private volatile long epoch = 0L;
protected volatile long lastConnectionClosedTimestamp = 0L;
interface Connection {
@@ -104,11 +108,15 @@ public class ConnectionHandler {
state.setState(State.Connecting);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after connection was closed",
state.topic, state.getHandlerName());
- ++epoch;
+ incrementEpoch();
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}
+ protected long incrementEpoch() {
+ return EPOCH_UPDATER.incrementAndGet(this);
+ }
+
@VisibleForTesting
public void connectionClosed(ClientCnx cnx) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
@@ -124,7 +132,7 @@ public class ConnectionHandler {
delayMs / 1000.0);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after timeout", state.topic,
state.getHandlerName());
- ++epoch;
+ incrementEpoch();
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 528efca..786af84 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1287,7 +1287,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId,
producerName, conf.isEncryptionEnabled(), metadata,
- schemaInfo, connectionHandler.epoch,
userProvidedProducerName),
+ schemaInfo, connectionHandler.getEpoch(),
userProvidedProducerName),
requestId).thenAccept(response -> {
String producerName = response.getProducerName();
long lastSequenceId = response.getLastSequenceId();