This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 11426b3a PROTON-2599 Fix race that was causing some intermittent test
failures
11426b3a is described below
commit 11426b3ab3f182243bf627115da6f831ea4cca12
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Sep 8 18:26:53 2022 -0400
PROTON-2599 Fix race that was causing some intermittent test failures
---
.../qpid/protonj2/client/impl/ClientTrackable.java | 24 +++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
index 1bf4a4b8..61615852 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
@@ -20,6 +20,8 @@ package org.apache.qpid.protonj2.client.impl;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
@@ -30,14 +32,24 @@ import org.apache.qpid.protonj2.engine.OutgoingDelivery;
/**
* Base type used to provide some common plumbing for Tracker types
+ *
+ * @param <SenderType> The client sender type that created this tracker
+ * @param <TrackerType> The actual type of tracker that is being implemented
*/
public abstract class ClientTrackable<SenderType extends
ClientSenderLinkType<?>, TrackerType> {
protected final SenderType sender;
protected final OutgoingDelivery delivery;
+ @SuppressWarnings("rawtypes")
+ protected static final AtomicIntegerFieldUpdater<ClientTrackable>
REMOTELY_SETTLED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClientTrackable.class,
"remotelySettled");
+ @SuppressWarnings("rawtypes")
+ protected static final AtomicReferenceFieldUpdater<ClientTrackable,
DeliveryState> REMOTEL_DELIVERY_STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ClientTrackable.class,
DeliveryState.class, "remoteDeliveryState");
+
private ClientFuture<TrackerType> remoteSettlementFuture;
- private volatile boolean remotelySettled;
+ private volatile int remotelySettled;
private volatile DeliveryState remoteDeliveryState;
/**
@@ -69,7 +81,7 @@ public abstract class ClientTrackable<SenderType extends
ClientSenderLinkType<?>
}
public boolean remoteSettled() {
- return remotelySettled;
+ return remotelySettled > 0;
}
public TrackerType disposition(DeliveryState state, boolean settle) throws
ClientException {
@@ -199,10 +211,10 @@ public abstract class ClientTrackable<SenderType extends
ClientSenderLinkType<?>
//----- Internal Event hooks for delivery updates
private void processDeliveryUpdated(OutgoingDelivery delivery) {
- remotelySettled = delivery.isRemotelySettled();
- remoteDeliveryState =
ClientDeliveryState.fromProtonType(delivery.getRemoteState());
-
if (delivery.isRemotelySettled()) {
+ REMOTELY_SETTLED_UPDATER.lazySet(this, 1);
+ REMOTEL_DELIVERY_STATE_UPDATER.lazySet(this,
ClientDeliveryState.fromProtonType(delivery.getRemoteState()));
+
if (sender.options.autoSettle()) {
delivery.settle();
}
@@ -212,6 +224,8 @@ public abstract class ClientTrackable<SenderType extends
ClientSenderLinkType<?>
remoteSettlementFuture.complete(self());
}
}
+ } else {
+ REMOTEL_DELIVERY_STATE_UPDATER.set(this,
ClientDeliveryState.fromProtonType(delivery.getRemoteState()));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]