This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 37edaf238b4576950869e5f5622cf8ce20961dab Author: Alex Petrov <[email protected]> AuthorDate: Tue Mar 7 17:33:48 2023 +0100 [CEP-21] Add debounce to log replay patch by Alex Petrov; reviewed by Marcus Eriksson and Sam Tunnicliffe for CASSANDRA-18402 --- .../cassandra/tcm/ClusterMetadataService.java | 13 ++-- .../org/apache/cassandra/tcm/RemoteProcessor.java | 66 ++++++++++++++++++- .../org/apache/cassandra/tcm/DebounceTest.java | 73 ++++++++++++++++++++++ 3 files changed, 147 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 653f9c9b77..7978a58741 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -417,12 +417,17 @@ public class ClusterMetadataService public boolean maybeCatchup(Epoch theirEpoch) { + if (theirEpoch.isBefore(Epoch.FIRST)) + return false; + Epoch ourEpoch = ClusterMetadata.current().epoch; - if (!theirEpoch.isBefore(Epoch.FIRST) && theirEpoch.isAfter(ourEpoch)) + while (theirEpoch.isAfter(ourEpoch)) { if (state() == State.GOSSIP) { - logger.warn("TODO: can't catchup in gossip mode (their epoch = {})", theirEpoch); //todo: we have seen a message with epoch > EMPTY, we are probably racing with migration, or we missed the finish migration message, handle! + //TODO we have seen a message with epoch > EMPTY, we are probably racing with migration, + // or we missed the finish migration message, handle! + logger.warn("Cannot catchup while in gossip mode (target epoch = {})", theirEpoch); return false; } @@ -430,8 +435,8 @@ public class ClusterMetadataService ourEpoch = ClusterMetadata.current().epoch; if (theirEpoch.isAfter(ourEpoch)) { - throw new IllegalArgumentException(String.format("Could not catch up to epoch %s even after replay. Highest seen after replay is %s.", - theirEpoch, ourEpoch)); + throw new IllegalStateException(String.format("Could not catch up to epoch %s even after replay. Highest seen after replay is %s.", + theirEpoch, ourEpoch)); } return true; } diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index ca6c56654f..db0310d1a1 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -24,14 +24,18 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ExecutorFactory; +import org.apache.cassandra.concurrent.SequentialExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.FailureDetector; @@ -46,7 +50,10 @@ import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.Promise; import static org.apache.cassandra.net.NoPayload.noPayload; @@ -57,11 +64,12 @@ public final class RemoteProcessor implements Processor private static final Logger logger = LoggerFactory.getLogger(RemoteProcessor.class); private final Supplier<Collection<InetAddressAndPort>> discoveryNodes; private final LocalLog log; - + private final Debounce<ClusterMetadata> replayAndWaitDebounced; RemoteProcessor(LocalLog log, Supplier<Collection<InetAddressAndPort>> discoveryNodes) { this.log = log; this.discoveryNodes = discoveryNodes; + this.replayAndWaitDebounced = new Debounce<ClusterMetadata>(this::replayAndWaitInternal); } @Override @@ -112,6 +120,23 @@ public final class RemoteProcessor implements Processor @Override @SuppressWarnings("resource") public ClusterMetadata replayAndWait() + { + try + { + return replayAndWaitDebounced.getAsync().get(); + } + catch (InterruptedException e) + { + throw new RuntimeException("Can not replay during shutdown", e); + } + catch (ExecutionException e) + { + JVMStabilityInspector.inspectThrowable(e); + throw Throwables.throwAsUncheckedException(e); + } + } + + private ClusterMetadata replayAndWaitInternal() { Epoch lastConsecutive = log.replayPersisted(); @@ -295,4 +320,43 @@ public final class RemoteProcessor implements Processor return endOfData(); } } + + public static class Debounce<T> + { + private final Callable<T> get; + private final AtomicReference<Future<T>> currentFuture = new AtomicReference<>(); + private final SequentialExecutorPlus executor; + + public Debounce(Callable<T> get) + { + this.executor = ExecutorFactory.Global.executorFactory().sequential("debounce"); + this.get = get; + } + + public Future<T> getAsync() + { + while (true) + { + Future<T> running = currentFuture.get(); + // Anything that is done, however recent, is considered stale + if (running != null && !running.isDone()) + return running; + + AsyncPromise<T> promise = new AsyncPromise<>(); + if (currentFuture.compareAndSet(running, promise)) + { + executor.submit(() -> { + try + { + promise.setSuccess(get.call()); + } + catch (Throwable t) + { + promise.setFailure(t); + } + }); + } + } + } + } } diff --git a/test/unit/org/apache/cassandra/tcm/DebounceTest.java b/test/unit/org/apache/cassandra/tcm/DebounceTest.java new file mode 100644 index 0000000000..064b598441 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/DebounceTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.concurrent.ExecutorFactory; +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.utils.concurrent.CountDownLatch; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +public class DebounceTest +{ + @Test + public void testDebounce() throws Throwable + { + int threads = 20; + WaitQueue waitQueue = WaitQueue.newWaitQueue(); + ExecutorPlus executor = ExecutorFactory.Global.executorFactory().pooled("debounce-test", threads); + + for (int i = 0; i < 100; i++) + { + CountDownLatch latch = CountDownLatch.newCountDownLatch(threads); + + AtomicInteger integer = new AtomicInteger(); + RemoteProcessor.Debounce<Integer> debounce = new RemoteProcessor.Debounce<>(() -> { + integer.incrementAndGet(); + return integer.get(); + }); + + List<Future<Integer>> futures = new CopyOnWriteArrayList<>(); + for (int j = 0; j < threads; j++) + { + executor.submit(() -> { + latch.decrement(); + waitQueue.register().awaitUninterruptibly(); + futures.add(debounce.getAsync()); + }); + } + + latch.awaitUninterruptibly(); + waitQueue.signalAll(); + + while (futures.size() < threads) + Thread.sleep(10); + + for (Future<Integer> future : futures) + Assert.assertEquals(1, (int) future.get()); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
