This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 37edaf238b4576950869e5f5622cf8ce20961dab
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Mar 7 17:33:48 2023 +0100

    [CEP-21] Add debounce to log replay
    
    patch by Alex Petrov; reviewed by Marcus Eriksson and Sam Tunnicliffe
    for CASSANDRA-18402
---
 .../cassandra/tcm/ClusterMetadataService.java      | 13 ++--
 .../org/apache/cassandra/tcm/RemoteProcessor.java  | 66 ++++++++++++++++++-
 .../org/apache/cassandra/tcm/DebounceTest.java     | 73 ++++++++++++++++++++++
 3 files changed, 147 insertions(+), 5 deletions(-)

diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 653f9c9b77..7978a58741 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -417,12 +417,17 @@ public class ClusterMetadataService
 
     public boolean maybeCatchup(Epoch theirEpoch)
     {
+        if (theirEpoch.isBefore(Epoch.FIRST))
+            return false;
+
         Epoch ourEpoch = ClusterMetadata.current().epoch;
-        if (!theirEpoch.isBefore(Epoch.FIRST) && theirEpoch.isAfter(ourEpoch))
+        while (theirEpoch.isAfter(ourEpoch))
         {
             if (state() == State.GOSSIP)
             {
-                logger.warn("TODO: can't catchup in gossip mode (their epoch = 
{})", theirEpoch); //todo: we have seen a message with epoch > EMPTY, we are 
probably racing with migration, or we missed the finish migration message, 
handle!
+                //TODO we have seen a message with epoch > EMPTY, we are 
probably racing with migration,
+                //     or we missed the finish migration message, handle!
+                logger.warn("Cannot catchup while in gossip mode (target epoch 
= {})", theirEpoch);
                 return false;
             }
 
@@ -430,8 +435,8 @@ public class ClusterMetadataService
             ourEpoch = ClusterMetadata.current().epoch;
             if (theirEpoch.isAfter(ourEpoch))
             {
-                throw new IllegalArgumentException(String.format("Could not 
catch up to epoch %s even after replay. Highest seen after replay is %s.",
-                                                                 theirEpoch, 
ourEpoch));
+                throw new IllegalStateException(String.format("Could not catch 
up to epoch %s even after replay. Highest seen after replay is %s.",
+                                                              theirEpoch, 
ourEpoch));
             }
             return true;
         }
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index ca6c56654f..db0310d1a1 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -24,14 +24,18 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.SequentialExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.FailureDetector;
@@ -46,7 +50,10 @@ import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Promise;
 
 import static org.apache.cassandra.net.NoPayload.noPayload;
@@ -57,11 +64,12 @@ public final class RemoteProcessor implements Processor
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteProcessor.class);
     private final Supplier<Collection<InetAddressAndPort>> discoveryNodes;
     private final LocalLog log;
-
+    private final Debounce<ClusterMetadata> replayAndWaitDebounced;
     RemoteProcessor(LocalLog log, Supplier<Collection<InetAddressAndPort>> 
discoveryNodes)
     {
         this.log = log;
         this.discoveryNodes = discoveryNodes;
+        this.replayAndWaitDebounced = new 
Debounce<ClusterMetadata>(this::replayAndWaitInternal);
     }
 
     @Override
@@ -112,6 +120,23 @@ public final class RemoteProcessor implements Processor
     @Override
     @SuppressWarnings("resource")
     public ClusterMetadata replayAndWait()
+    {
+        try
+        {
+            return replayAndWaitDebounced.getAsync().get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException("Can not replay during shutdown", e);
+        }
+        catch (ExecutionException e)
+        {
+            JVMStabilityInspector.inspectThrowable(e);
+            throw Throwables.throwAsUncheckedException(e);
+        }
+    }
+
+    private ClusterMetadata replayAndWaitInternal()
     {
         Epoch lastConsecutive = log.replayPersisted();
 
@@ -295,4 +320,43 @@ public final class RemoteProcessor implements Processor
             return endOfData();
         }
     }
+
+    public static class Debounce<T>
+    {
+        private final Callable<T> get;
+        private final AtomicReference<Future<T>> currentFuture = new 
AtomicReference<>();
+        private final SequentialExecutorPlus executor;
+
+        public Debounce(Callable<T> get)
+        {
+            this.executor = 
ExecutorFactory.Global.executorFactory().sequential("debounce");
+            this.get = get;
+        }
+
+        public Future<T> getAsync()
+        {
+            while (true)
+            {
+                Future<T> running = currentFuture.get();
+                // Anything that is done, however recent, is considered stale
+                if (running != null && !running.isDone())
+                    return running;
+
+                AsyncPromise<T> promise = new AsyncPromise<>();
+                if (currentFuture.compareAndSet(running, promise))
+                {
+                    executor.submit(() -> {
+                        try
+                        {
+                            promise.setSuccess(get.call());
+                        }
+                        catch (Throwable t)
+                        {
+                            promise.setFailure(t);
+                        }
+                    });
+                }
+            }
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/tcm/DebounceTest.java 
b/test/unit/org/apache/cassandra/tcm/DebounceTest.java
new file mode 100644
index 0000000000..064b598441
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/DebounceTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+public class DebounceTest
+{
+    @Test
+    public void testDebounce() throws Throwable
+    {
+        int threads = 20;
+        WaitQueue waitQueue = WaitQueue.newWaitQueue();
+        ExecutorPlus executor = 
ExecutorFactory.Global.executorFactory().pooled("debounce-test", threads);
+
+        for (int i = 0; i < 100; i++)
+        {
+            CountDownLatch latch = CountDownLatch.newCountDownLatch(threads);
+
+            AtomicInteger integer = new AtomicInteger();
+            RemoteProcessor.Debounce<Integer> debounce = new 
RemoteProcessor.Debounce<>(() -> {
+                integer.incrementAndGet();
+                return integer.get();
+            });
+
+            List<Future<Integer>> futures = new CopyOnWriteArrayList<>();
+            for (int j = 0; j < threads; j++)
+            {
+                executor.submit(() -> {
+                    latch.decrement();
+                    waitQueue.register().awaitUninterruptibly();
+                    futures.add(debounce.getAsync());
+                });
+            }
+
+            latch.awaitUninterruptibly();
+            waitQueue.signalAll();
+
+            while (futures.size() < threads)
+                Thread.sleep(10);
+
+            for (Future<Integer> future : futures)
+                Assert.assertEquals(1, (int) future.get());
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to