This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new cbf4dcb334 Enable EpochAwareDebounce to cancel in flight rpc requests
cbf4dcb334 is described below
commit cbf4dcb3345c7e2f42f6a897c66b6460b7acc2ca
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Apr 12 14:04:06 2024 +0100
Enable EpochAwareDebounce to cancel in flight rpc requests
Patch by Sam Tunnicliffe; reviewed by Alex Petrov and Marcus Eriksson
for CASSANDRA-19514
---
.../apache/cassandra/tcm/EpochAwareDebounce.java | 65 ++++++++--
.../org/apache/cassandra/tcm/PeerLogFetcher.java | 19 +--
.../org/apache/cassandra/tcm/RemoteProcessor.java | 38 ++++--
.../cassandra/distributed/impl/Instance.java | 5 +-
.../distributed/test/log/CMSCatchupTest.java | 71 +++++++++++
.../test/log/FetchLogFromPeers2Test.java | 134 ++++++++++++++++++++
.../test/log/FetchLogFromPeersTest.java | 139 +--------------------
7 files changed, 306 insertions(+), 165 deletions(-)
diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
index 5621845487..f65c03d830 100644
--- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
+++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
@@ -18,13 +18,23 @@
package org.apache.cassandra.tcm;
-import java.util.concurrent.Callable;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
/**
* When debouncing from a replica we know exactly which epoch we need, so to
avoid retries we
@@ -32,12 +42,13 @@ import org.apache.cassandra.utils.concurrent.Future;
* comes in, we create a new future. If a request for a newer epoch comes in,
we simply
* swap out the current future reference for a new one which is requesting the
newer epoch.
*/
-public class EpochAwareDebounce<T>
+public class EpochAwareDebounce
{
- public static final EpochAwareDebounce<ClusterMetadata> instance = new
EpochAwareDebounce<>();
-
- private final AtomicReference<EpochAwareAsyncPromise<T>> currentFuture =
new AtomicReference<>();
+ private static final Logger logger =
LoggerFactory.getLogger(EpochAwareDebounce.class);
+ public static final EpochAwareDebounce instance = new EpochAwareDebounce();
+ private final AtomicReference<EpochAwareAsyncPromise> currentFuture = new
AtomicReference<>();
private final ExecutorPlus executor;
+ private final List<Promise<LogState>> inflightRequests = new
CopyOnWriteArrayList<>();
private EpochAwareDebounce()
{
@@ -45,24 +56,50 @@ public class EpochAwareDebounce<T>
this.executor =
ExecutorFactory.Global.executorFactory().pooled("debounce", 2);
}
- public Future<T> getAsync(Callable<T> get, Epoch epoch)
+ /**
+ * Deduplicate requests to catch up log state based on the desired epoch.
Callers supply a target epoch and
+ * a function obtain the ClusterMetadata that corresponds with it. It is
expected that this function will make rpc
+ * calls to peers, retrieving a LogState which can be applied locally to
produce the necessary {@code
+ * ClusterMetadata}. The function takes a {@code Promise<LogState>} as
input, with the expectation that this
+ * specific instance will be used to provide blocking behaviour when
making the rpc calls that fetch the {@code
+ * LogState}. These promises are memoized in order to cancel them when
{@link #shutdownAndWait(long, TimeUnit)} is
+ * called. This causes the fetch function to stop waiting on any in flight
{@code LogState} requests and prevents
+ * shutdown from being blocked.
+ *
+ * @param fetchFunction executes the request for LogState. It's expected
that this popluates fetchResult with the
+ * successful result.
+ * @param epoch the desired epoch
+ * @return
+ */
+ public Future<ClusterMetadata> getAsync(Function<Promise<LogState>,
ClusterMetadata> fetchFunction,
+ Epoch epoch)
{
while (true)
{
- EpochAwareAsyncPromise<T> running = currentFuture.get();
+ EpochAwareAsyncPromise running = currentFuture.get();
if (running != null && !running.isDone() &&
running.epoch.isEqualOrAfter(epoch))
return running;
- EpochAwareAsyncPromise<T> promise = new
EpochAwareAsyncPromise<>(epoch);
+ Promise<LogState> fetchResult = new AsyncPromise<>();
+
+ EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch);
if (currentFuture.compareAndSet(running, promise))
{
+ fetchResult.addCallback((logState, error) -> {
+ logger.debug("Removing future remotely requesting epoch {}
from in flight list", epoch);
+ inflightRequests.remove(fetchResult);
+ });
+ inflightRequests.add(fetchResult);
+
executor.submit(() -> {
try
{
- promise.setSuccess(get.call());
+ promise.setSuccess(fetchFunction.apply(fetchResult));
}
catch (Throwable t)
{
+ fetchResult.cancel(true);
+ inflightRequests.remove(fetchResult);
promise.setFailure(t);
}
});
@@ -71,7 +108,7 @@ public class EpochAwareDebounce<T>
}
}
- private static class EpochAwareAsyncPromise<T> extends AsyncPromise<T>
+ private static class EpochAwareAsyncPromise extends
AsyncPromise<ClusterMetadata>
{
private final Epoch epoch;
public EpochAwareAsyncPromise(Epoch epoch)
@@ -79,4 +116,12 @@ public class EpochAwareDebounce<T>
this.epoch = epoch;
}
}
+
+ public void shutdownAndWait(long timeout, TimeUnit unit) throws
InterruptedException, TimeoutException
+ {
+ logger.info("Cancelling {} in flight log fetch requests",
inflightRequests.size());
+ for (Promise<LogState> toCancel : inflightRequests)
+ toCancel.cancel(true);
+ ExecutorUtils.shutdownAndWait(timeout, unit, executor);
+ }
}
diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
index 29c7e6b1cb..61cbc63eac 100644
--- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
+++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
public class PeerLogFetcher
{
@@ -72,10 +74,11 @@ public class PeerLogFetcher
public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote,
Epoch awaitAtleast)
{
- return EpochAwareDebounce.instance.getAsync(() ->
fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
+ Function<Promise<LogState>, ClusterMetadata> fn = promise ->
fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
+ return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
}
- private ClusterMetadata fetchLogEntriesAndWaitInternal(InetAddressAndPort
remote, Epoch awaitAtleast)
+ private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState>
remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
{
Epoch before = ClusterMetadata.current().epoch;
if (before.isEqualOrAfter(awaitAtleast))
@@ -85,11 +88,13 @@ public class PeerLogFetcher
try (Timer.Context ctx =
TCMMetrics.instance.fetchPeerLogLatency.time())
{
- LogState logState =
RemoteProcessor.sendWithCallback(Verb.TCM_FETCH_PEER_LOG_REQ,
- new
FetchPeerLog(before),
- new
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
-
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-
new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
+ RemoteProcessor.sendWithCallbackAsync(remoteRequest,
+ Verb.TCM_FETCH_PEER_LOG_REQ,
+ new FetchPeerLog(before),
+ new
RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
+
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+ new
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
+ LogState logState = remoteRequest.awaitUninterruptibly().get();
log.append(logState);
ClusterMetadata fetched = log.waitForHighestConsecutive();
if (fetched.epoch.isEqualOrAfter(awaitAtleast))
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 260d151419..a647d4d8b0 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
@@ -49,6 +50,7 @@ import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Promise;
import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR;
@@ -124,13 +126,19 @@ public final class RemoteProcessor implements Processor
@Override
public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline
retryPolicy)
{
- // Synchonous, non-debounced call if we are waiting for the highest
epoch. Should be used sparingly.
+ Function<Promise<LogState>, ClusterMetadata> fetchFunction =
+ promise -> fetchLogAndWaitInternal(promise,
+ new
CandidateIterator(candidates(true), false),
+ log);
+ // Synchonous, non-debounced call if we are waiting for the highest
epoch (without knowing/caring what it is).
+ // Should be used sparingly.
if (waitFor == null)
- return fetchLogAndWaitInternal();
+ return fetchFunction.apply(new AsyncPromise<>());
try
{
- return
EpochAwareDebounce.instance.getAsync(this::fetchLogAndWaitInternal,
waitFor).get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS);
+ Future<ClusterMetadata> cmFuture =
EpochAwareDebounce.instance.getAsync(fetchFunction, waitFor);
+ return cmFuture.get(retryPolicy.remainingNanos(),
TimeUnit.NANOSECONDS);
}
catch (InterruptedException e)
{
@@ -142,29 +150,37 @@ public final class RemoteProcessor implements Processor
}
}
- private ClusterMetadata fetchLogAndWaitInternal()
+ public static ClusterMetadata fetchLogAndWait(CandidateIterator
candidateIterator, LocalLog log)
{
- return fetchLogAndWait(new CandidateIterator(candidates(true), false),
log);
+ Promise<LogState> remoteRequest = new AsyncPromise<>();
+ return fetchLogAndWaitInternal(remoteRequest, candidateIterator, log);
}
- public static ClusterMetadata fetchLogAndWait(CandidateIterator
candidateIterator, LocalLog log)
+ private static ClusterMetadata fetchLogAndWaitInternal(Promise<LogState>
remoteRequest,
+ CandidateIterator
candidates,
+ LocalLog log)
{
try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time())
{
Epoch currentEpoch = log.metadata().epoch;
- LogState replay = sendWithCallback(Verb.TCM_FETCH_CMS_LOG_REQ,
- new FetchCMSLog(currentEpoch,
ClusterMetadataService.state() == REMOTE),
- candidateIterator,
- new
Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
+ sendWithCallbackAsync(remoteRequest,
+ Verb.TCM_FETCH_CMS_LOG_REQ,
+ new FetchCMSLog(currentEpoch,
ClusterMetadataService.state() == REMOTE),
+ candidates,
+ new
Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
+ LogState replay = remoteRequest.awaitUninterruptibly().get();
if (!replay.isEmpty())
{
logger.info("Replay request returned replay data: {}", replay);
log.append(replay);
TCMMetrics.instance.cmsLogEntriesFetched(currentEpoch,
replay.latestEpoch());
}
-
return log.waitForHighestConsecutive();
}
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
}
// todo rename to send with retries or something
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 25b2cb8703..a2b225ca0b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -131,12 +131,14 @@ import org.apache.cassandra.service.paxos.PaxosRepair;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.service.snapshot.SnapshotManager;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamTransferTask;
import org.apache.cassandra.streaming.async.NettyStreamingChannel;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.EpochAwareDebounce;
import org.apache.cassandra.tcm.Startup;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
@@ -925,7 +927,8 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
() -> SSTableReader.shutdownBlocking(1L,
MINUTES),
() ->
shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
() ->
ActiveRepairService.instance().shutdownNowAndWait(1L, MINUTES),
- () ->
org.apache.cassandra.service.snapshot.SnapshotManager.shutdownAndWait(1L,
MINUTES)
+ () -> SnapshotManager.shutdownAndWait(1L,
MINUTES),
+ () ->
EpochAwareDebounce.instance.shutdownAndWait(1L, MINUTES)
);
internodeMessagingStarted = false;
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
new file mode 100644
index 0000000000..3e8cb9c297
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSCatchupTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+
+import static org.junit.Assert.assertTrue;
+
+public class CMSCatchupTest extends TestBaseImpl
+{
+ @Test
+ public void testCMSCatchup() throws Exception
+ {
+ try (Cluster cluster = init(builder().withNodes(4)
+ .withConfig(c ->
c.with(Feature.NETWORK, Feature.GOSSIP)) // needed for addtocms below
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int
primary key)"));
+ cluster.get(2).runOnInstance(() -> AddToCMS.initiate());
+ cluster.get(3).runOnInstance(() -> AddToCMS.initiate());
+ // isolate node2 from the other CMS members to ensure it's behind
+ cluster.filters().inbound().from(1).to(2).drop();
+ cluster.filters().inbound().from(3).to(2).drop();
+ AtomicInteger fetchedFromPeer = new AtomicInteger();
+ cluster.filters().inbound().from(2).to(4).messagesMatching((from,
to, msg) -> {
+ if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id)
+ fetchedFromPeer.getAndIncrement();
+ return false;
+ }).drop().on();
+
+ long mark = cluster.get(4).logs().mark();
+ cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl
with comment='test 123'"), ConsistencyLevel.ONE);
+ cluster.get(4).logs().watchFor(mark, "AlterOptions");
+
+ mark = cluster.get(2).logs().mark();
+ cluster.get(1).shutdown().get();
+ cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump
to shutdown");
+ // node2, a CMS member, is now behind and node1 is shut down.
+ // Try reading at QUORUM from node4, node2 should detect it's
behind and catch up from node4
+ int before = fetchedFromPeer.get();
+ cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl
where id = 55"), ConsistencyLevel.QUORUM);
+ assertTrue(fetchedFromPeer.get() > before);
+ }
+ }
+
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
new file mode 100644
index 0000000000..d3b549d20f
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import
org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.ClusterState;
+import org.apache.cassandra.metrics.TCMMetrics;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+
+import static
org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FetchLogFromPeers2Test extends TestBaseImpl
+{
+ @Test
+ public void testSchema() throws Exception
+ {
+ try (Cluster cluster = init(builder().withNodes(3)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("alter keyspace %s with
replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int
primary key)"));
+ cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int
primary key)"));
+
+ for (ClusterState clusterState : ClusterState.values())
+ for (Operation operation : Operation.values())
+ {
+ setupSchemaBehind(cluster);
+ runQuery(cluster, clusterState, operation);
+ }
+ }
+ }
+
+ public void runQuery(Cluster cluster, ClusterState clusterState, Operation
operation) throws ExecutionException, InterruptedException
+ {
+ cluster.get(1).shutdown().get();
+
+ // node2 is behind
+ String query;
+ switch (operation)
+ {
+ case READ:
+ query = "select * from %s.tbl where id = 5";
+ break;
+ case WRITE:
+ query = "insert into %s.tbl (id) values (5)";
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ int coordinator = coordinator(clusterState);
+ long mark = cluster.get(2).logs().mark();
+ long metricsBefore = cluster.get(2).callOnInstance(() ->
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
+ if (clusterState == ClusterState.COORDINATOR_BEHIND)
+ {
+ long [] coordinatorBehindMetricsBefore = new long[cluster.size()];
+ try
+ {
+ for (int i = 1; i <= cluster.size(); i++)
+ if (!cluster.get(i).isShutdown())
+ coordinatorBehindMetricsBefore[i - 1] =
cluster.get(i).callOnInstance(() ->
TCMMetrics.instance.coordinatorBehindSchema.getCount());
+ cluster.coordinator(coordinator).execute(withKeyspace(query),
ConsistencyLevel.QUORUM);
+ fail("should fail");
+ }
+ catch (Exception ignored) {}
+
+ boolean metricBumped = false;
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ if (i == coordinator || cluster.get(i).isShutdown())
+ continue;
+ long metricAfter = cluster.get(i).callOnInstance(() ->
TCMMetrics.instance.coordinatorBehindSchema.getCount());
+ if (metricAfter - coordinatorBehindMetricsBefore[i - 1] > 0)
+ {
+ metricBumped = true;
+ break;
+ }
+ }
+ assertTrue("Metric CoordinatorBehindSchema should have been bumped
for at least one replica", metricBumped);
+
+ }
+ cluster.coordinator(coordinator).execute(withKeyspace(query),
ConsistencyLevel.QUORUM);
+ assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from
/127.0.0.3:7012").getResult().size() > 0);
+ long metricsAfter = cluster.get(2).callOnInstance(() ->
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
+ assertTrue(metricsAfter > metricsBefore);
+
+ cluster.get(1).startup();
+ }
+
+ public void setupSchemaBehind(Cluster cluster)
+ {
+ cluster.filters().reset();
+ cluster.filters().inbound().from(1).to(2).drop();
+ long epochBefore = cluster.get(3).callOnInstance(() ->
ClusterMetadata.current().epoch.getEpoch());
+ cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with
comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE);
+ cluster.get(3).runOnInstance(() -> {
+ try
+ {
+
ClusterMetadataService.instance().awaitAtLeast(Epoch.create(epochBefore).nextEpoch());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ cluster.filters().reset();
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
index fbcd080a98..6996f0c190 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeersTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.distributed.test.log;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,7 +27,6 @@ import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IMessageFilters;
@@ -38,14 +36,12 @@ import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.log.LogState;
-import org.apache.cassandra.tcm.sequences.AddToCMS;
import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
import static org.junit.Assert.assertTrue;
@@ -53,29 +49,10 @@ import static org.junit.Assert.fail;
public class FetchLogFromPeersTest extends TestBaseImpl
{
- enum ClusterState { COORDINATOR_BEHIND, REPLICA_BEHIND }
- enum Operation { READ, WRITE }
+ public enum ClusterState { COORDINATOR_BEHIND, REPLICA_BEHIND }
+ public enum Operation { READ, WRITE }
- @Test
- public void testSchema() throws Exception
- {
- try (Cluster cluster = init(builder().withNodes(3)
- .start()))
- {
- cluster.schemaChange(withKeyspace("alter keyspace %s with
replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
- cluster.schemaChange(withKeyspace("create table %s.tbl (id int
primary key)"));
- cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int
primary key)"));
-
- for (ClusterState clusterState : ClusterState.values())
- for (Operation operation : Operation.values())
- {
- setupSchemaBehind(cluster);
- runQuery(cluster, clusterState, operation);
- }
- }
- }
-
- public int coordinator(ClusterState clusterState)
+ public static int coordinator(ClusterState clusterState)
{
switch (clusterState)
{
@@ -87,81 +64,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl
throw new IllegalStateException();
}
- public void runQuery(Cluster cluster, ClusterState clusterState, Operation
operation) throws ExecutionException, InterruptedException
- {
- cluster.get(1).shutdown().get();
-
- // node2 is behind
- String query;
- switch (operation)
- {
- case READ:
- query = "select * from %s.tbl where id = 5";
- break;
- case WRITE:
- query = "insert into %s.tbl (id) values (5)";
- break;
- default:
- throw new IllegalStateException();
- }
- int coordinator = coordinator(clusterState);
- long mark = cluster.get(2).logs().mark();
- long metricsBefore = cluster.get(2).callOnInstance(() ->
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
- if (clusterState == ClusterState.COORDINATOR_BEHIND)
- {
- long [] coordinatorBehindMetricsBefore = new long[cluster.size()];
- try
- {
- for (int i = 1; i <= cluster.size(); i++)
- if (!cluster.get(i).isShutdown())
- coordinatorBehindMetricsBefore[i - 1] =
cluster.get(i).callOnInstance(() ->
TCMMetrics.instance.coordinatorBehindSchema.getCount());
- cluster.coordinator(coordinator).execute(withKeyspace(query),
ConsistencyLevel.QUORUM);
- fail("should fail");
- }
- catch (Exception ignored) {}
-
- boolean metricBumped = false;
- for (int i = 1; i <= cluster.size(); i++)
- {
- if (i == coordinator || cluster.get(i).isShutdown())
- continue;
- long metricAfter = cluster.get(i).callOnInstance(() ->
TCMMetrics.instance.coordinatorBehindSchema.getCount());
- if (metricAfter - coordinatorBehindMetricsBefore[i - 1] > 0)
- {
- metricBumped = true;
- break;
- }
- }
- assertTrue("Metric CoordinatorBehindSchema should have been bumped
for at least one replica", metricBumped);
-
- }
- cluster.coordinator(coordinator).execute(withKeyspace(query),
ConsistencyLevel.QUORUM);
- assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from
/127.0.0.3:7012").getResult().size() > 0);
- long metricsAfter = cluster.get(2).callOnInstance(() ->
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
- assertTrue(metricsAfter > metricsBefore);
-
- cluster.get(1).startup();
- }
-
- public void setupSchemaBehind(Cluster cluster)
- {
- cluster.filters().reset();
- cluster.filters().inbound().from(1).to(2).drop();
- long epochBefore = cluster.get(3).callOnInstance(() ->
ClusterMetadata.current().epoch.getEpoch());
- cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with
comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE);
- cluster.get(3).runOnInstance(() -> {
- try
- {
-
ClusterMetadataService.instance().awaitAtLeast(Epoch.create(epochBefore).nextEpoch());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- });
- cluster.filters().reset();
- }
-
@Test
public void catchupCoordinatorBehindTestPlacements() throws Exception
{
@@ -263,40 +165,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl
}
}
- @Test
- public void testCMSCatchupTest() throws Exception
- {
- try (Cluster cluster = init(builder().withNodes(4)
- .withConfig(c ->
c.with(Feature.NETWORK, Feature.GOSSIP)) // needed for addtocms below
- .start()))
- {
- cluster.schemaChange(withKeyspace("create table %s.tbl (id int
primary key)"));
- cluster.get(2).runOnInstance(() -> AddToCMS.initiate());
- cluster.get(3).runOnInstance(() -> AddToCMS.initiate());
- // isolate node2 from the other CMS members to ensure it's behind
- cluster.filters().inbound().from(1).to(2).drop();
- cluster.filters().inbound().from(3).to(2).drop();
- AtomicInteger fetchedFromPeer = new AtomicInteger();
- cluster.filters().inbound().from(2).to(4).messagesMatching((from,
to, msg) -> {
- if (msg.verb() == Verb.TCM_FETCH_PEER_LOG_REQ.id)
- fetchedFromPeer.getAndIncrement();
- return false;
- }).drop().on();
-
- long mark = cluster.get(4).logs().mark();
- cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl
with comment='test 123'"), ConsistencyLevel.ONE);
- cluster.get(4).logs().watchFor(mark, "AlterOptions");
-
- mark = cluster.get(2).logs().mark();
- cluster.get(1).shutdown().get();
- cluster.get(2).logs().watchFor(mark, "/127.0.0.1:7012 state jump
to shutdown");
- // node2, a CMS member, is now behind and node1 is shut down.
- // Try reading at QUORUM from node4, node2 should detect it's
behind and catch up from node4
- int before = fetchedFromPeer.get();
- cluster.coordinator(4).execute(withKeyspace("select * from %s.tbl
where id = 55"), ConsistencyLevel.QUORUM);
- assertTrue(fetchedFromPeer.get() > before);
- }
- }
@Test
public void catchupWithSnapshot() throws Exception
@@ -370,7 +238,6 @@ public class FetchLogFromPeersTest extends TestBaseImpl
}
}
-
private static void executeAlters(Cluster cluster)
{
for (int i = 0; i < 10; i++)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]