This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new a8f78552 Accord's ConfigService lock is held over large areas which
cause deadlocks and performance issues (#136)
a8f78552 is described below
commit a8f78552b958a9d9b6fc9f2840753001cd1f6004
Author: dcapwell <[email protected]>
AuthorDate: Tue Nov 12 11:57:58 2024 -0800
Accord's ConfigService lock is held over large areas which cause deadlocks
and performance issues (#136)
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-20065
---
.../main/java/accord/api/ConfigurationService.java | 12 ++
.../accord/impl/AbstractConfigurationService.java | 148 +++++++++++++--------
.../main/java/accord/utils/async/AsyncResults.java | 6 +
3 files changed, 108 insertions(+), 58 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java
b/accord-core/src/main/java/accord/api/ConfigurationService.java
index cfe52546..41624d17 100644
--- a/accord-core/src/main/java/accord/api/ConfigurationService.java
+++ b/accord-core/src/main/java/accord/api/ConfigurationService.java
@@ -103,6 +103,18 @@ public interface ConfigurationService
{
return new EpochReady(epoch, DONE, DONE, DONE, DONE);
}
+
+ @Override
+ public String toString()
+ {
+ return "EpochReady{" +
+ "epoch=" + epoch +
+ ", metadata=" + metadata +
+ ", fastPath=" + fastPath +
+ ", data=" + data +
+ ", reads=" + reads +
+ '}';
+ }
}
diff --git
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 7edc3114..1a243e94 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -20,6 +20,9 @@ package accord.impl;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
@@ -45,15 +48,16 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
protected final EpochHistory epochs = createEpochHistory();
- protected final List<Listener> listeners = new ArrayList<>();
+ protected final List<Listener> listeners = new CopyOnWriteArrayList<>();
public abstract static class AbstractEpochState
{
protected final long epoch;
protected final AsyncResult.Settable<Topology> received =
AsyncResults.settable();
protected final AsyncResult.Settable<Void> acknowledged =
AsyncResults.settable();
+ @GuardedBy("AbstractEpochHistory.this")
protected AsyncResult<Void> reads = null;
-
+ @GuardedBy("AbstractEpochHistory.this")
protected Topology topology = null;
public AbstractEpochState(long epoch)
@@ -74,7 +78,10 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
}
/**
- * Access needs to be synchronized by the parent ConfigurationService class
+ * Thread safety is managed by a synchronized lock on this object, and
extending classes can use the same lock when needed.
+ *
+ * There is a special case when the last recieved/acknowleged epochs are
needed, they can be accessed without a lock
+ * and provide a happens-before relationship (if lastReceived=42, epoch 42
exists and is set up)
*/
@VisibleForTesting
public abstract static class AbstractEpochHistory<EpochState extends
AbstractEpochState>
@@ -82,30 +89,46 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
// TODO (low priority): move pendingEpochs / FetchTopology into here?
private List<EpochState> epochs = new ArrayList<>();
- protected long lastReceived = 0;
- protected long lastAcknowledged = 0;
+ /**
+ * {@link #lastReceived} and {@link #lastAcknowledged} help define the
visibility by making sure that the update is the very last thing done in their
respected logic.
+ * This means that the read of these fields can be done without a lock
and can be used to tell the caller that the given epoch has reached a given
step.
+ *
+ * The reason that these fields are private are to help enforce the
thread safety of this class.
+ */
+ private volatile long lastReceived = 0;
+ private volatile long lastAcknowledged = 0;
protected abstract EpochState createEpochState(long epoch);
- public long minEpoch()
+ public long lastReceived()
+ {
+ return lastReceived;
+ }
+
+ public long lastAcknowledged()
+ {
+ return lastAcknowledged;
+ }
+
+ public synchronized long minEpoch()
{
return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
}
- public long maxEpoch()
+ public synchronized long maxEpoch()
{
int size = epochs.size();
return size == 0 ? 0L : epochs.get(size - 1).epoch;
}
@VisibleForTesting
- EpochState atIndex(int idx)
+ synchronized EpochState atIndex(int idx)
{
return epochs.get(idx);
}
@VisibleForTesting
- int size()
+ synchronized int size()
{
return epochs.size();
}
@@ -115,7 +138,7 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
return lastReceived == 0;
}
- EpochState getOrCreate(long epoch)
+ synchronized EpochState getOrCreate(long epoch)
{
Invariants.checkArgument(epoch > 0, "Epoch must be positive but
given %d", epoch);
if (epochs.isEmpty())
@@ -149,13 +172,18 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
public void receive(Topology topology)
{
- long epoch = topology.epoch();
- logger.debug("Receiving epoch {}", epoch);
- Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 ||
lastReceived == 0,
- "Epoch %d != %d + 1", epoch, lastReceived);
- lastReceived = epoch;
- EpochState state = getOrCreate(epoch);
- state.topology = topology;
+ EpochState state;
+ synchronized (this)
+ {
+ long epoch = topology.epoch();
+ logger.debug("Receiving epoch {}", epoch);
+ Invariants.checkState(lastReceived == epoch - 1 || epoch == 0
|| lastReceived == 0,
+ "Epoch %d != %d + 1", epoch,
lastReceived);
+ state = getOrCreate(epoch);
+ state.topology = topology;
+ lastReceived = epoch;
+ }
+ // avoid resolving the future while holding the lock, as the
callbacks get called in-line
state.received.setSuccess(topology);
}
@@ -169,25 +197,35 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
return getOrCreate(epoch).topology;
}
+ Topology topologyForLastReceived()
+ {
+ return getOrCreate(lastReceived).topology;
+ }
+
public void acknowledge(EpochReady ready)
{
- long epoch = ready.epoch;
- logger.debug("Acknowledging epoch {}", epoch);
- Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0
|| lastAcknowledged == 0,
- "Epoch %d != %d + 1", epoch,
lastAcknowledged);
- lastAcknowledged = epoch;
- EpochState state = getOrCreate(epoch);
- Invariants.checkState(state.reads == null, "Reads result was
already set for epoch", epoch);
- state.reads = ready.reads;
+ EpochState state;
+ synchronized (this)
+ {
+ long epoch = ready.epoch;
+ logger.debug("Acknowledging epoch {}", epoch);
+ Invariants.checkState(lastAcknowledged == epoch - 1 || epoch
== 0 || lastAcknowledged == 0,
+ "Epoch %d != %d + 1", epoch,
lastAcknowledged);
+ state = getOrCreate(epoch);
+ Invariants.checkState(state.reads == null, "Reads result was
already set for epoch", epoch);
+ state.reads = ready.reads;
+ lastAcknowledged = epoch;
+ }
+ // avoid resolving the future while holding the lock, as the
callbacks get called in-line
state.acknowledged.setSuccess(null);
}
- AsyncResult<Void> acknowledgeFuture(long epoch)
+ public AsyncResult<Void> acknowledgeFuture(long epoch)
{
return getOrCreate(epoch).acknowledged;
}
- void truncateUntil(long epoch)
+ synchronized void truncateUntil(long epoch)
{
Invariants.checkArgument(epoch <= maxEpoch(), "epoch %d > %d",
epoch, maxEpoch());
long minEpoch = minEpoch();
@@ -206,44 +244,48 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
protected abstract EpochHistory createEpochHistory();
- protected synchronized EpochState getOrCreateEpochState(long epoch)
+ protected EpochState getOrCreateEpochState(long epoch)
{
return epochs.getOrCreate(epoch);
}
@Override
- public synchronized void registerListener(Listener listener)
+ public void registerListener(Listener listener)
{
listeners.add(listener);
}
- public synchronized boolean isEmpty()
+ public boolean isEmpty()
{
return epochs.isEmpty();
}
@Override
- public synchronized Topology currentTopology()
+ public Topology currentTopology()
{
- return epochs.topologyFor(epochs.lastReceived);
+ return epochs.topologyForLastReceived();
}
@Override
- public synchronized Topology getTopologyForEpoch(long epoch)
+ public Topology getTopologyForEpoch(long epoch)
{
return epochs.topologyFor(epoch);
}
protected abstract void fetchTopologyInternal(long epoch);
+ @GuardedBy("this")
private long maxRequestedEpoch;
@Override
- public synchronized void fetchTopologyForEpoch(long epoch)
+ public void fetchTopologyForEpoch(long epoch)
{
- if (epoch <= maxRequestedEpoch)
- return;
+ synchronized (this)
+ {
+ if (epoch <= maxRequestedEpoch)
+ return;
- maxRequestedEpoch = epoch;
+ maxRequestedEpoch = epoch;
+ }
fetchTopologyInternal(epoch);
}
@@ -252,26 +294,16 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
@Override
public void acknowledgeEpoch(EpochReady ready, boolean startSync)
{
- ready.metadata.addCallback(() -> {
- synchronized (AbstractConfigurationService.this)
- {
- epochs.acknowledge(ready);
- }
- });
- ready.fastPath.addCallback(() -> {
- synchronized (AbstractConfigurationService.this)
- {
- localSyncComplete(epochs.getOrCreate(ready.epoch).topology,
startSync);
- }
- });
+ ready.metadata.addCallback(() -> epochs.acknowledge(ready));
+ ready.fastPath.addCallback(() ->
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
}
protected void topologyUpdatePreListenerNotify(Topology topology) {}
protected void topologyUpdatePostListenerNotify(Topology topology) {}
- public synchronized void reportTopology(Topology topology, boolean isLoad,
boolean startSync)
+ public void reportTopology(Topology topology, boolean isLoad, boolean
startSync)
{
- long lastReceived = epochs.lastReceived;
+ long lastReceived = epochs.lastReceived();
if (topology.epoch() <= lastReceived)
return;
@@ -283,7 +315,7 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
return;
}
- long lastAcked = epochs.lastAcknowledged;
+ long lastAcked = epochs.lastAcknowledged();
if (lastAcked == 0 && lastReceived > 0)
{
logger.debug("Epoch {} received; waiting for {} to ack before
reporting", topology.epoch(), epochs.minEpoch());
@@ -304,27 +336,27 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
topologyUpdatePostListenerNotify(topology);
}
- public synchronized void reportTopology(Topology topology)
+ public void reportTopology(Topology topology)
{
reportTopology(topology, false, true);
}
protected void receiveRemoteSyncCompletePreListenerNotify(Node.Id node,
long epoch) {}
- public synchronized void receiveRemoteSyncComplete(Node.Id node, long
epoch)
+ public void receiveRemoteSyncComplete(Node.Id node, long epoch)
{
receiveRemoteSyncCompletePreListenerNotify(node, epoch);
for (Listener listener : listeners)
listener.onRemoteSyncComplete(node, epoch);
}
- public synchronized void receiveClosed(Ranges ranges, long epoch)
+ public void receiveClosed(Ranges ranges, long epoch)
{
for (Listener listener : listeners)
listener.onEpochClosed(ranges, epoch);
}
- public synchronized void receiveRedundant(Ranges ranges, long epoch)
+ public void receiveRedundant(Ranges ranges, long epoch)
{
for (Listener listener : listeners)
listener.onEpochRedundant(ranges, epoch);
@@ -333,7 +365,7 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
protected void truncateTopologiesPreListenerNotify(long epoch) {}
protected void truncateTopologiesPostListenerNotify(long epoch) {}
- public synchronized void truncateTopologiesUntil(long epoch)
+ public void truncateTopologiesUntil(long epoch)
{
truncateTopologiesPreListenerNotify(epoch);
for (Listener listener : listeners)
@@ -342,7 +374,7 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
epochs.truncateUntil(epoch);
}
- public synchronized AsyncChain<Void> epochReady(long epoch)
+ public AsyncChain<Void> epochReady(long epoch)
{
EpochState state = epochs.getOrCreate(epoch);
if (state.reads != null)
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 9406781f..5f7a74b2 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -328,6 +328,12 @@ public class AsyncResults
{
return failure == null;
}
+
+ @Override
+ public String toString()
+ {
+ return "Immediate{" + (isSuccess() ? "success" : "failure") + "}";
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]