This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a8f78552 Accord's ConfigService lock is held over large areas which 
cause deadlocks and performance issues (#136)
a8f78552 is described below

commit a8f78552b958a9d9b6fc9f2840753001cd1f6004
Author: dcapwell <[email protected]>
AuthorDate: Tue Nov 12 11:57:58 2024 -0800

    Accord's ConfigService lock is held over large areas which cause deadlocks 
and performance issues (#136)
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-20065
---
 .../main/java/accord/api/ConfigurationService.java |  12 ++
 .../accord/impl/AbstractConfigurationService.java  | 148 +++++++++++++--------
 .../main/java/accord/utils/async/AsyncResults.java |   6 +
 3 files changed, 108 insertions(+), 58 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java 
b/accord-core/src/main/java/accord/api/ConfigurationService.java
index cfe52546..41624d17 100644
--- a/accord-core/src/main/java/accord/api/ConfigurationService.java
+++ b/accord-core/src/main/java/accord/api/ConfigurationService.java
@@ -103,6 +103,18 @@ public interface ConfigurationService
         {
             return new EpochReady(epoch, DONE, DONE, DONE, DONE);
         }
+
+        @Override
+        public String toString()
+        {
+            return "EpochReady{" +
+                   "epoch=" + epoch +
+                   ", metadata=" + metadata +
+                   ", fastPath=" + fastPath +
+                   ", data=" + data +
+                   ", reads=" + reads +
+                   '}';
+        }
     }
 
 
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 7edc3114..1a243e94 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -20,6 +20,9 @@ package accord.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
@@ -45,15 +48,16 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
 
     protected final EpochHistory epochs = createEpochHistory();
 
-    protected final List<Listener> listeners = new ArrayList<>();
+    protected final List<Listener> listeners = new CopyOnWriteArrayList<>();
 
     public abstract static class AbstractEpochState
     {
         protected final long epoch;
         protected final AsyncResult.Settable<Topology> received = 
AsyncResults.settable();
         protected final AsyncResult.Settable<Void> acknowledged = 
AsyncResults.settable();
+        @GuardedBy("AbstractEpochHistory.this")
         protected AsyncResult<Void> reads = null;
-
+        @GuardedBy("AbstractEpochHistory.this")
         protected Topology topology = null;
 
         public AbstractEpochState(long epoch)
@@ -74,7 +78,10 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
     }
 
     /**
-     * Access needs to be synchronized by the parent ConfigurationService class
+     * Thread safety is managed by a synchronized lock on this object, and 
extending classes can use the same lock when needed.
+     *
+     * There is a special case when the last recieved/acknowleged epochs are 
needed, they can be accessed without a lock
+     * and provide a happens-before relationship (if lastReceived=42, epoch 42 
exists and is set up)
      */
     @VisibleForTesting
     public abstract static class AbstractEpochHistory<EpochState extends 
AbstractEpochState>
@@ -82,30 +89,46 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         // TODO (low priority): move pendingEpochs / FetchTopology into here?
         private List<EpochState> epochs = new ArrayList<>();
 
-        protected long lastReceived = 0;
-        protected long lastAcknowledged = 0;
+        /**
+         * {@link #lastReceived} and {@link #lastAcknowledged} help define the 
visibility by making sure that the update is the very last thing done in their 
respected logic.
+         * This means that the read of these fields can be done without a lock 
and can be used to tell the caller that the given epoch has reached a given 
step.
+         *
+         * The reason that these fields are private are to help enforce the 
thread safety of this class.
+         */
+        private volatile long lastReceived = 0;
+        private volatile long lastAcknowledged = 0;
 
         protected abstract EpochState createEpochState(long epoch);
 
-        public long minEpoch()
+        public long lastReceived()
+        {
+            return lastReceived;
+        }
+
+        public long lastAcknowledged()
+        {
+            return lastAcknowledged;
+        }
+
+        public synchronized long minEpoch()
         {
             return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
         }
 
-        public long maxEpoch()
+        public synchronized long maxEpoch()
         {
             int size = epochs.size();
             return size == 0 ? 0L : epochs.get(size - 1).epoch;
         }
 
         @VisibleForTesting
-        EpochState atIndex(int idx)
+        synchronized EpochState atIndex(int idx)
         {
             return epochs.get(idx);
         }
 
         @VisibleForTesting
-        int size()
+        synchronized int size()
         {
             return epochs.size();
         }
@@ -115,7 +138,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return lastReceived == 0;
         }
 
-        EpochState getOrCreate(long epoch)
+        synchronized EpochState getOrCreate(long epoch)
         {
             Invariants.checkArgument(epoch > 0, "Epoch must be positive but 
given %d", epoch);
             if (epochs.isEmpty())
@@ -149,13 +172,18 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
 
         public void receive(Topology topology)
         {
-            long epoch = topology.epoch();
-            logger.debug("Receiving epoch {}", epoch);
-            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || 
lastReceived == 0,
-                                  "Epoch %d != %d + 1", epoch, lastReceived);
-            lastReceived = epoch;
-            EpochState state = getOrCreate(epoch);
-            state.topology = topology;
+            EpochState state;
+            synchronized (this)
+            {
+                long epoch = topology.epoch();
+                logger.debug("Receiving epoch {}", epoch);
+                Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 
|| lastReceived == 0,
+                                      "Epoch %d != %d + 1", epoch, 
lastReceived);
+                state = getOrCreate(epoch);
+                state.topology = topology;
+                lastReceived = epoch;
+            }
+            // avoid resolving the future while holding the lock, as the 
callbacks get called in-line
             state.received.setSuccess(topology);
         }
 
@@ -169,25 +197,35 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return getOrCreate(epoch).topology;
         }
 
+        Topology topologyForLastReceived()
+        {
+            return getOrCreate(lastReceived).topology;
+        }
+
         public void acknowledge(EpochReady ready)
         {
-            long epoch = ready.epoch;
-            logger.debug("Acknowledging epoch {}", epoch);
-            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 
|| lastAcknowledged == 0,
-                                  "Epoch %d != %d + 1", epoch, 
lastAcknowledged);
-            lastAcknowledged = epoch;
-            EpochState state = getOrCreate(epoch);
-            Invariants.checkState(state.reads == null, "Reads result was 
already set for epoch", epoch);
-            state.reads = ready.reads;
+            EpochState state;
+            synchronized (this)
+            {
+                long epoch = ready.epoch;
+                logger.debug("Acknowledging epoch {}", epoch);
+                Invariants.checkState(lastAcknowledged == epoch - 1 || epoch 
== 0 || lastAcknowledged == 0,
+                                      "Epoch %d != %d + 1", epoch, 
lastAcknowledged);
+                state = getOrCreate(epoch);
+                Invariants.checkState(state.reads == null, "Reads result was 
already set for epoch", epoch);
+                state.reads = ready.reads;
+                lastAcknowledged = epoch;
+            }
+            // avoid resolving the future while holding the lock, as the 
callbacks get called in-line
             state.acknowledged.setSuccess(null);
         }
 
-        AsyncResult<Void> acknowledgeFuture(long epoch)
+        public AsyncResult<Void> acknowledgeFuture(long epoch)
         {
             return getOrCreate(epoch).acknowledged;
         }
 
-        void truncateUntil(long epoch)
+        synchronized void truncateUntil(long epoch)
         {
             Invariants.checkArgument(epoch <= maxEpoch(), "epoch %d > %d", 
epoch, maxEpoch());
             long minEpoch = minEpoch();
@@ -206,44 +244,48 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
 
     protected abstract EpochHistory createEpochHistory();
 
-    protected synchronized EpochState getOrCreateEpochState(long epoch)
+    protected EpochState getOrCreateEpochState(long epoch)
     {
         return epochs.getOrCreate(epoch);
     }
 
     @Override
-    public synchronized void registerListener(Listener listener)
+    public void registerListener(Listener listener)
     {
         listeners.add(listener);
     }
 
-    public synchronized boolean isEmpty()
+    public boolean isEmpty()
     {
         return epochs.isEmpty();
     }
 
     @Override
-    public synchronized Topology currentTopology()
+    public Topology currentTopology()
     {
-        return epochs.topologyFor(epochs.lastReceived);
+        return epochs.topologyForLastReceived();
     }
 
     @Override
-    public synchronized Topology getTopologyForEpoch(long epoch)
+    public Topology getTopologyForEpoch(long epoch)
     {
         return epochs.topologyFor(epoch);
     }
 
     protected abstract void fetchTopologyInternal(long epoch);
 
+    @GuardedBy("this")
     private long maxRequestedEpoch;
     @Override
-    public synchronized void fetchTopologyForEpoch(long epoch)
+    public void fetchTopologyForEpoch(long epoch)
     {
-        if (epoch <= maxRequestedEpoch)
-            return;
+        synchronized (this)
+        {
+            if (epoch <= maxRequestedEpoch)
+                return;
 
-        maxRequestedEpoch = epoch;
+            maxRequestedEpoch = epoch;
+        }
         fetchTopologyInternal(epoch);
     }
 
@@ -252,26 +294,16 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
     @Override
     public void acknowledgeEpoch(EpochReady ready, boolean startSync)
     {
-        ready.metadata.addCallback(() -> {
-            synchronized (AbstractConfigurationService.this)
-            {
-                epochs.acknowledge(ready);
-            }
-        });
-        ready.fastPath.addCallback(() ->  {
-            synchronized (AbstractConfigurationService.this)
-            {
-                localSyncComplete(epochs.getOrCreate(ready.epoch).topology, 
startSync);
-            }
-        });
+        ready.metadata.addCallback(() -> epochs.acknowledge(ready));
+        ready.fastPath.addCallback(() ->  
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
     }
 
     protected void topologyUpdatePreListenerNotify(Topology topology) {}
     protected void topologyUpdatePostListenerNotify(Topology topology) {}
 
-    public synchronized void reportTopology(Topology topology, boolean isLoad, 
boolean startSync)
+    public void reportTopology(Topology topology, boolean isLoad, boolean 
startSync)
     {
-        long lastReceived = epochs.lastReceived;
+        long lastReceived = epochs.lastReceived();
         if (topology.epoch() <= lastReceived)
             return;
 
@@ -283,7 +315,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return;
         }
 
-        long lastAcked = epochs.lastAcknowledged;
+        long lastAcked = epochs.lastAcknowledged();
         if (lastAcked == 0 && lastReceived > 0)
         {
             logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), epochs.minEpoch());
@@ -304,27 +336,27 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         topologyUpdatePostListenerNotify(topology);
     }
 
-    public synchronized void reportTopology(Topology topology)
+    public void reportTopology(Topology topology)
     {
         reportTopology(topology, false, true);
     }
 
     protected void receiveRemoteSyncCompletePreListenerNotify(Node.Id node, 
long epoch) {}
 
-    public synchronized void receiveRemoteSyncComplete(Node.Id node, long 
epoch)
+    public void receiveRemoteSyncComplete(Node.Id node, long epoch)
     {
         receiveRemoteSyncCompletePreListenerNotify(node, epoch);
         for (Listener listener : listeners)
             listener.onRemoteSyncComplete(node, epoch);
     }
 
-    public synchronized void receiveClosed(Ranges ranges, long epoch)
+    public void receiveClosed(Ranges ranges, long epoch)
     {
         for (Listener listener : listeners)
             listener.onEpochClosed(ranges, epoch);
     }
 
-    public synchronized void receiveRedundant(Ranges ranges, long epoch)
+    public void receiveRedundant(Ranges ranges, long epoch)
     {
         for (Listener listener : listeners)
             listener.onEpochRedundant(ranges, epoch);
@@ -333,7 +365,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
     protected void truncateTopologiesPreListenerNotify(long epoch) {}
     protected void truncateTopologiesPostListenerNotify(long epoch) {}
 
-    public synchronized void truncateTopologiesUntil(long epoch)
+    public void truncateTopologiesUntil(long epoch)
     {
         truncateTopologiesPreListenerNotify(epoch);
         for (Listener listener : listeners)
@@ -342,7 +374,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         epochs.truncateUntil(epoch);
     }
 
-    public synchronized AsyncChain<Void> epochReady(long epoch)
+    public AsyncChain<Void> epochReady(long epoch)
     {
         EpochState state = epochs.getOrCreate(epoch);
         if (state.reads != null)
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java 
b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 9406781f..5f7a74b2 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -328,6 +328,12 @@ public class AsyncResults
         {
             return failure == null;
         }
+
+        @Override
+        public String toString()
+        {
+            return "Immediate{" + (isSuccess() ? "success" : "failure") + "}";
+        }
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to