This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new d8174c7105 CEP-15 (C*) Integrate accord with repair d8174c7105 is described below commit d8174c71054ade99b0d5209c3ca81f7300c5899e Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Thu Feb 15 10:52:55 2024 -0800 CEP-15 (C*) Integrate accord with repair Patch by Blake Eggleston; Reviewed by Ariel Weisberg and David Capwell for CASSANDRA-19472 --- .gitmodules | 4 +- modules/accord | 2 +- .../config/CassandraRelevantProperties.java | 1 + .../db/streaming/CassandraStreamReceiver.java | 2 +- .../org/apache/cassandra/metrics/TableMetrics.java | 12 +- .../apache/cassandra/repair/AbstractRepairJob.java | 66 ---- .../cassandra/repair/AbstractRepairTask.java | 3 +- .../apache/cassandra/repair/RepairCoordinator.java | 18 + .../{CassandraRepairJob.java => RepairJob.java} | 103 ++++- .../org/apache/cassandra/repair/RepairSession.java | 50 +-- .../cassandra/repair/messages/RepairOption.java | 51 ++- .../cassandra/repair/state/CoordinatorState.java | 4 - .../org/apache/cassandra/schema/TableMetadata.java | 7 + .../cassandra/service/ActiveRepairService.java | 5 +- .../cassandra/service/accord/AccordService.java | 69 +++- .../cassandra/service/accord/IAccordService.java | 8 + .../accord/repair/AccordRepair.java} | 104 ++++-- .../accord/repair/RepairSyncPointAdapter.java | 79 ++++ .../accord/repair/RequiredResponseTracker.java | 79 ++++ .../accord/serializers/ReadDataSerializers.java | 3 + .../migration/ConsensusMigrationRepairResult.java | 25 +- .../migration/ConsensusMigrationRepairType.java | 5 +- .../migration/ConsensusMigrationTarget.java | 11 + .../migration/ConsensusTableMigration.java | 7 +- ...beFinishConsensusMigrationForTableAndRange.java | 8 +- .../apache/cassandra/tools/nodetool/Repair.java | 4 + .../test/OptimiseStreamsRepairTest.java | 4 +- .../test/accord/AccordIncrementalRepairTest.java | 415 +++++++++++++++++++++ .../test/accord/AccordMigrationTest.java | 2 +- .../simulator/cluster/OnInstanceRepair.java | 2 +- .../cassandra/repair/FailingRepairFuzzTest.java | 7 +- .../org/apache/cassandra/repair/FuzzTestBase.java | 18 +- .../org/apache/cassandra/repair/RepairJobTest.java | 58 +-- .../apache/cassandra/repair/RepairSessionTest.java | 2 +- .../accord/AccordFastPathCoordinatorTest.java | 1 - .../cassandra/service/accord/AccordTestUtils.java | 19 + .../service/accord/AccordTopologyTest.java | 95 +---- .../service/accord/AccordTopologyUtils.java | 131 +++++++ .../accord/repair/RequiredResponseTrackerTest.java | 97 +++++ 39 files changed, 1212 insertions(+), 369 deletions(-) diff --git a/.gitmodules b/.gitmodules index 616dacf610..da620237a6 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = ../cassandra-accord.git + branch = accord-IR diff --git a/modules/accord b/modules/accord index 8b4f3895cb..3aaec7566e 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 8b4f3895cb926f937450676b1db2e23d01a8b820 +Subproject commit 3aaec7566e389a0037b93b748867886fb68a0fd0 diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 85847d2158..a78e01c83e 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -35,6 +35,7 @@ import org.apache.cassandra.utils.StorageCompatibilityMode; /** A class that extracts system properties for the cassandra node it runs within. */ public enum CassandraRelevantProperties { + ACCORD_AGENT_CLASS("cassandra.test.accord.agent"), ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL("cassandra.accord.repair.range_step_update_interval", "100"), ACQUIRE_RETRY_SECONDS("cassandra.acquire_retry_seconds", "60"), ACQUIRE_SLEEP_MS("cassandra.acquire_sleep_ms", "1000"), diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index e75b6be269..409a25c7bb 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -246,7 +246,7 @@ public class CassandraStreamReceiver implements StreamReceiver checkNotNull(minVersion, "Unable to determine minimum cluster version"); IAccordService accordService = AccordService.instance(); if (session.streamOperation().requiresBarrierTransaction() - && cfs.metadata().isAccordEnabled() + && cfs.metadata().requiresAccordSupport() && CassandraVersion.CASSANDRA_5_0.compareTo(minVersion) >= 0) accordService.postStreamReceivingBarrier(cfs, ranges); diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 93f55008d8..8d1b3e52ff 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -186,9 +186,9 @@ public class TableMetrics /** Latency for locally run key migrations **/ public final LatencyMetrics keyMigration; /** Latency for range migrations run by locally coordinated Accord repairs **/ - public final LatencyMetrics rangeMigration; - public final TableMeter rangeMigrationUnexpectedFailures; - public final TableMeter rangeMigrationDependencyLimitFailures; + public final LatencyMetrics accordRepair; + public final TableMeter accordRepairUnexpectedFailures; + public final TableMeter accordRepairDependencyLimitFailures; /** percent of the data that is repaired */ public final Gauge<Double> percentRepaired; /** Reports the size of sstables in repaired, unrepaired, and any ongoing repair buckets */ @@ -813,9 +813,9 @@ public class TableMetrics casPropose = createLatencyMetrics("CasPropose", cfs.keyspace.metric.casPropose); casCommit = createLatencyMetrics("CasCommit", cfs.keyspace.metric.casCommit); keyMigration = createLatencyMetrics("KeyMigration", cfs.keyspace.metric.keyMigration, GLOBAL_KEY_MIGRATION_LATENCY); - rangeMigration = createLatencyMetrics("RangeMigration", cfs.keyspace.metric.rangeMigration, GLOBAL_RANGE_MIGRATION_LATENCY); - rangeMigrationUnexpectedFailures = createTableMeter("RangeMigrationUnexpectedFailures", cfs.keyspace.metric.rangeMigrationUnexpectedFailures); - rangeMigrationDependencyLimitFailures = createTableMeter("RangeMigrationDependencyLimitFaiures", cfs.keyspace.metric.rangeMigrationDependencyLimitFailures); + accordRepair = createLatencyMetrics("AccordRepair", cfs.keyspace.metric.rangeMigration, GLOBAL_RANGE_MIGRATION_LATENCY); + accordRepairUnexpectedFailures = createTableMeter("AccordRepairUnexpectedFailures", cfs.keyspace.metric.rangeMigrationUnexpectedFailures); + accordRepairDependencyLimitFailures = createTableMeter("AccordRepairDependencyLimitFaiures", cfs.keyspace.metric.rangeMigrationDependencyLimitFailures); repairsStarted = createTableCounter("RepairJobsStarted"); repairsCompleted = createTableCounter("RepairJobsCompleted"); diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairJob.java b/src/java/org/apache/cassandra/repair/AbstractRepairJob.java deleted file mode 100644 index df3a67dbc9..0000000000 --- a/src/java/org/apache/cassandra/repair/AbstractRepairJob.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair; - -import java.util.concurrent.Executor; -import javax.annotation.Nullable; - -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.repair.state.JobState; -import org.apache.cassandra.utils.concurrent.AsyncFuture; - -public abstract class AbstractRepairJob extends AsyncFuture<RepairResult> implements Runnable -{ - protected final SharedContext ctx; - public final JobState state; - protected final RepairJobDesc desc; - protected final RepairSession session; - protected final Executor taskExecutor; - - protected final Keyspace ks; - protected final ColumnFamilyStore cfs; - - /** - * Create repair job to run on specific columnfamily - * @param session RepairSession that this RepairJob belongs - * @param columnFamily name of the ColumnFamily to repair - */ - public AbstractRepairJob(RepairSession session, String columnFamily) - { - this.ctx = session.ctx; - this.session = session; - this.taskExecutor = session.taskExecutor; - this.desc = new RepairJobDesc(session.state.parentRepairSession, session.getId(), session.state.keyspace, columnFamily, session.state.commonRange.ranges); - this.state = new JobState(ctx.clock(), desc, session.state.commonRange.endpoints); - this.ks = Keyspace.open(desc.keyspace); - this.cfs = ks.getColumnFamilyStore(columnFamily); - } - - public void run() - { - state.phase.start(); - cfs.metric.repairsStarted.inc(); - runRepair(); - } - - abstract protected void runRepair(); - - abstract void abort(@Nullable Throwable reason); -} diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java index 67ac2cd287..9bf3fe2f84 100644 --- a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java +++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java @@ -79,7 +79,8 @@ public abstract class AbstractRepairTask implements RepairTask options.optimiseStreams(), options.repairPaxos(), options.paxosOnly(), - options.accordRepair(), + options.accordOnly(), + options.isConsensusMigration(), executor, validationScheduler, cfnames); diff --git a/src/java/org/apache/cassandra/repair/RepairCoordinator.java b/src/java/org/apache/cassandra/repair/RepairCoordinator.java index dadbfcf6f2..f49530b5ba 100644 --- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java +++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java @@ -66,6 +66,7 @@ import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.repair.state.CoordinatorState; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SystemDistributedKeyspace; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; import org.apache.cassandra.service.ClientState; @@ -282,12 +283,29 @@ public class RepairCoordinator implements Runnable, ProgressEventNotifier, Repai } } + private void validate(RepairOption options) + { + if (options.paxosOnly() && options.accordOnly()) + throw new IllegalArgumentException("Cannot specify a repair as both paxos only and accord only"); + + for (ColumnFamilyStore cfs : columnFamilies) + { + TableMetadata metadata = cfs.metadata(); + if (options.paxosOnly() && !metadata.supportsPaxosOperations()) + throw new IllegalArgumentException(String.format("Cannot run paxos only repair on %s.%s, which isn't configured for paxos operations", cfs.keyspace.getName(), cfs.name)); + + if (options.accordOnly() && !metadata.requiresAccordSupport()) + throw new IllegalArgumentException(String.format("Cannot run accord only repair on %s.%s, which isn't configured for accord operations", cfs.keyspace.getName(), cfs.name)); + } + } + private void runMayThrow() throws Throwable { state.phase.setup(); ctx.repair().recordRepairStatus(state.cmd, ParentRepairStatus.IN_PROGRESS, ImmutableList.of()); populateColumnFamilies(); + validate(state.options); this.traceState = maybeCreateTraceState(columnFamilies); notifyStarting(); diff --git a/src/java/org/apache/cassandra/repair/CassandraRepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java similarity index 85% rename from src/java/org/apache/cassandra/repair/CassandraRepairJob.java rename to src/java/org/apache/cassandra/repair/RepairJob.java index 80daeff5d5..4ad1f803fb 100644 --- a/src/java/org/apache/cassandra/repair/CassandraRepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -40,6 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; @@ -47,9 +50,10 @@ import org.apache.cassandra.repair.asymmetric.DifferenceHolder; import org.apache.cassandra.repair.asymmetric.HostDifferences; import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter; import org.apache.cassandra.repair.asymmetric.ReduceHelper; -import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.repair.state.JobState; import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.repair.AccordRepair; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult; import org.apache.cassandra.service.paxos.cleanup.PaxosCleanup; import org.apache.cassandra.streaming.PreviewKind; @@ -59,6 +63,7 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.AsyncFuture; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; import org.apache.cassandra.utils.concurrent.ImmediateFuture; @@ -70,11 +75,14 @@ import static org.apache.cassandra.service.paxos.Paxos.useV2; /** * RepairJob runs repair on given ColumnFamily. */ -public class CassandraRepairJob extends AbstractRepairJob +public class RepairJob extends AsyncFuture<RepairResult> implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(CassandraRepairJob.class); + private static final Logger logger = LoggerFactory.getLogger(RepairJob.class); + protected final Keyspace ks; + protected final ColumnFamilyStore cfs; private final SharedContext ctx; + public final JobState state; private final RepairJobDesc desc; private final RepairSession session; private final RepairParallelism parallelismDegree; @@ -91,14 +99,24 @@ public class CassandraRepairJob extends AbstractRepairJob * @param session RepairSession that this RepairJob belongs * @param columnFamily name of the ColumnFamily to repair */ - public CassandraRepairJob(RepairSession session, String columnFamily) + public RepairJob(RepairSession session, String columnFamily) { - super(session, columnFamily); this.ctx = session.ctx; this.session = session; this.taskExecutor = session.taskExecutor; this.parallelismDegree = session.parallelismDegree; this.desc = new RepairJobDesc(session.state.parentRepairSession, session.getId(), session.state.keyspace, columnFamily, session.state.commonRange.ranges); + this.ks = Keyspace.open(desc.keyspace); + this.cfs = ks.getColumnFamilyStore(columnFamily); + this.state = new JobState(ctx.clock(), desc, session.state.commonRange.endpoints); + + TableMetadata metadata = this.cfs.metadata(); + if (session.paxosOnly && !metadata.supportsPaxosOperations()) + throw new IllegalArgumentException(String.format("Cannot run paxos only repair on %s.%s, which isn't configured for paxos operations", cfs.keyspace.getName(), cfs.name)); + + if (session.accordOnly && !metadata.requiresAccordSupport()) + throw new IllegalArgumentException(String.format("Cannot run accord only repair on %s.%s, which isn't configured for accord operations", cfs.keyspace.getName(), cfs.name)); + } public long getNowInSeconds() @@ -114,25 +132,39 @@ public class CassandraRepairJob extends AbstractRepairJob } } + @Override + public void run() + { + state.phase.start(); + cfs.metric.repairsStarted.inc(); + runRepair(); + } + /** * Runs repair job. * <p/> * This sets up necessary task and runs them on given {@code taskExecutor}. * After submitting all tasks, waits until validation with replica completes. */ - @Override protected void runRepair() { List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.state.commonRange.endpoints); allEndpoints.add(ctx.broadcastAddressAndPort()); + TableMetadata metadata = cfs.metadata(); Future<Void> paxosRepair; Epoch repairStartingEpoch = ClusterMetadata.current().epoch; - boolean doPaxosRepair = paxosRepairEnabled() && (((useV2() || isMetadataKeyspace()) && session.repairPaxos) || session.paxosOnly); + + Preconditions.checkArgument(!session.paxosOnly || !session.accordOnly); + boolean doPaxosRepair = paxosRepairEnabled() + && (((useV2() || isMetadataKeyspace()) && session.repairPaxos) || session.paxosOnly) + && metadata.supportsPaxosOperations() + && !session.accordOnly; + boolean doAccordRepair = metadata.requiresAccordSupport() && !session.paxosOnly; + if (doPaxosRepair) { logger.info("{} {}.{} starting paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); - TableMetadata metadata = Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily); paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor); } else @@ -141,6 +173,7 @@ public class CassandraRepairJob extends AbstractRepairJob paxosRepair = ImmediateFuture.success(null); } + if (session.paxosOnly) { paxosRepair.addCallback(new FutureCallback<>() @@ -148,12 +181,9 @@ public class CassandraRepairJob extends AbstractRepairJob public void onSuccess(Void ignored) { logger.info("{} {}.{} paxos repair completed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); - trySuccess(new RepairResult(desc, Collections.emptyList(), ConsensusMigrationRepairResult.fromCassandraRepair(repairStartingEpoch, false))); + trySuccess(new RepairResult(desc, Collections.emptyList(), ConsensusMigrationRepairResult.fromPaxosOnlyRepair(repairStartingEpoch, session.excludedDeadNodes))); } - /** - * Snapshot, validation and sync failures are all handled here - */ public void onFailure(Throwable t) { logger.warn("{} {}.{} paxos repair failed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); @@ -163,6 +193,43 @@ public class CassandraRepairJob extends AbstractRepairJob return; } + Future<Void> accordRepair; + if (doAccordRepair) + { + accordRepair = paxosRepair.flatMap(unused -> { + logger.info("{} {}.{} starting accord repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); + IPartitioner partitioner = metadata.partitioner; + AccordRepair repair = new AccordRepair(ctx, cfs, partitioner, desc.keyspace, desc.ranges, session.isConsensusMigration && session.accordOnly, allEndpoints); + return repair.repair(taskExecutor); + }, taskExecutor); + } + else + { + accordRepair = paxosRepair.flatMap(unused -> { + logger.info("{} {}.{} not running accord repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); + return ImmediateFuture.success(null); + }); + } + + if (session.accordOnly) + { + accordRepair.addCallback(new FutureCallback<Void>() + { + public void onSuccess(Void ignored) + { + logger.info("{} {}.{} accord repair completed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); + trySuccess(new RepairResult(desc, Collections.emptyList(), ConsensusMigrationRepairResult.fromAccordOnlyRepair(repairStartingEpoch, session.excludedDeadNodes))); + } + + public void onFailure(Throwable t) + { + logger.warn("{} {}.{} accord repair failed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); + tryFailure(t); + } + }, taskExecutor); + return; + } + // Create a snapshot at all nodes unless we're using pure parallel repairs final Future<?> allSnapshotTasks; if (parallelismDegree != RepairParallelism.PARALLEL) @@ -170,12 +237,12 @@ public class CassandraRepairJob extends AbstractRepairJob if (session.isIncremental) { // consistent repair does it's own "snapshotting" - allSnapshotTasks = paxosRepair.map(input -> allEndpoints); + allSnapshotTasks = accordRepair.map(input -> allEndpoints); } else { // Request snapshot to all replica - allSnapshotTasks = paxosRepair.flatMap(input -> { + allSnapshotTasks = accordRepair.flatMap(input -> { List<Future<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size()); state.phase.snapshotsSubmitted(); for (InetAddressAndPort endpoint : allEndpoints) @@ -198,7 +265,7 @@ public class CassandraRepairJob extends AbstractRepairJob // Run validations and the creation of sync tasks in the scheduler, so it can limit the number of Merkle trees // that there are in memory at once. When all validations complete, submit sync tasks out of the scheduler. - Future<List<SyncStat>> syncResults = session.validationScheduler.schedule(() -> createSyncTasks(paxosRepair, allSnapshotTasks, allEndpoints), taskExecutor) + Future<List<SyncStat>> syncResults = session.validationScheduler.schedule(() -> createSyncTasks(accordRepair, allSnapshotTasks, allEndpoints), taskExecutor) .flatMap(this::executeTasks, taskExecutor); // When all sync complete, set the final result @@ -215,7 +282,7 @@ public class CassandraRepairJob extends AbstractRepairJob } cfs.metric.repairsCompleted.inc(); logger.info("Completing repair with excludedDeadNodes {}", session.excludedDeadNodes); - trySuccess(new RepairResult(desc, stats, ConsensusMigrationRepairResult.fromCassandraRepair(repairStartingEpoch, doPaxosRepair && !session.excludedDeadNodes))); + trySuccess(new RepairResult(desc, stats, ConsensusMigrationRepairResult.fromRepair(repairStartingEpoch, doPaxosRepair, doAccordRepair, session.excludedDeadNodes))); } /** @@ -240,7 +307,7 @@ public class CassandraRepairJob extends AbstractRepairJob }, taskExecutor); } - private Future<List<SyncTask>> createSyncTasks(Future<Void> paxosRepair, Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints) + private Future<List<SyncTask>> createSyncTasks(Future<Void> accordRepair, Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints) { Future<List<TreeResponse>> treeResponses; if (allSnapshotTasks != null) @@ -256,7 +323,7 @@ public class CassandraRepairJob extends AbstractRepairJob else { // If not sequential, just send validation request to all replica - treeResponses = paxosRepair.flatMap(input -> sendValidationRequest(allEndpoints)); + treeResponses = accordRepair.flatMap(input -> sendValidationRequest(allEndpoints)); } treeResponses = treeResponses.map(a -> { diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 99b877a96b..230d77b52b 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -74,7 +74,7 @@ import org.apache.cassandra.utils.concurrent.AsyncFuture; * * A given RepairSession repairs a set of replicas for a given set of ranges on a list * of column families. For each of the column family to repair, RepairSession - * creates a {@link AbstractRepairJob} that handles the repair of that CF. + * creates a {@link RepairJob} that handles the repair of that CF. * * A given RepairJob has the 3 main phases: * <ol> @@ -120,8 +120,12 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I /** Range to repair */ public final boolean isIncremental; public final PreviewKind previewKind; - public final boolean repairPaxos; + public final boolean repairPaxos; // TODO (now): rename to repairPaxosIfSupported public final boolean paxosOnly; + + public final boolean accordOnly; + public final boolean isConsensusMigration; + public final boolean excludedDeadNodes; private final AtomicBoolean isFailed = new AtomicBoolean(false); @@ -136,22 +140,24 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I public final boolean optimiseStreams; public final SharedContext ctx; public final Scheduler validationScheduler; - private volatile List<AbstractRepairJob> jobs = Collections.emptyList(); - private final boolean accordRepair; + private volatile List<RepairJob> jobs = Collections.emptyList(); private volatile boolean terminated = false; /** * Create new repair session. - * @param parentRepairSession the parent sessions id - * @param commonRange ranges to repair - * @param excludedDeadNodes Was the repair started for --force and were dead nodes excluded as a result - * @param keyspace name of keyspace - * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees - * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption) - * @param repairPaxos true if incomplete paxos operations should be completed as part of repair - * @param paxosOnly true if we should only complete paxos operations, not run a normal repair - * @param cfnames names of columnfamilies + * + * @param parentRepairSession the parent sessions id + * @param commonRange ranges to repair + * @param excludedDeadNodes Was the repair started for --force and were dead nodes excluded as a result + * @param keyspace name of keyspace + * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees + * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption) + * @param repairPaxos true if incomplete paxos operations should be completed as part of repair + * @param paxosOnly true if we should only complete paxos operations, not run a normal repair + * @param accordOnly true if we should only complete accord operations, not run a normal repair + * @param isConsensusMigration true if this repair is being run by the consensus migration tool (affects accord repair availability requirements) + * @param cfnames names of columnfamilies */ public RepairSession(SharedContext ctx, Scheduler validationScheduler, @@ -166,13 +172,15 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, - boolean accordRepair, + boolean accordOnly, + boolean isConsensusMigration, String... cfnames) { this.ctx = ctx; this.validationScheduler = validationScheduler; this.repairPaxos = repairPaxos; this.paxosOnly = paxosOnly; + this.isConsensusMigration = isConsensusMigration; assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; this.state = new SessionState(ctx.clock(), parentRepairSession, keyspace, cfnames, commonRange); this.parallelismDegree = parallelismDegree; @@ -181,7 +189,7 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I this.pullRepair = pullRepair; this.optimiseStreams = optimiseStreams; this.taskExecutor = new SafeExecutor(createExecutor(ctx)); - this.accordRepair = accordRepair; + this.accordOnly = accordOnly; this.excludedDeadNodes = excludedDeadNodes; } @@ -305,7 +313,7 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I logger.info("{} parentSessionId = {}: new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), state.parentRepairSession, repairedNodes(), state.commonRange, state.keyspace, Arrays.toString(state.cfnames)); Tracing.traceRepair("Syncing range {}", state.commonRange); - if (!previewKind.isPreview() && !paxosOnly) + if (!previewKind.isPreview() && !paxosOnly && !accordOnly) { SystemDistributedKeyspace.startRepairs(getId(), state.parentRepairSession, state.keyspace, state.cfnames, state.commonRange); } @@ -343,12 +351,10 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I // Create and submit RepairJob for each ColumnFamily state.phase.jobsSubmitted(); - List<AbstractRepairJob> jobs = new ArrayList<>(state.cfnames.length); + List<RepairJob> jobs = new ArrayList<>(state.cfnames.length); for (String cfname : state.cfnames) { - AbstractRepairJob job = accordRepair ? - new AccordRepairJob(this, cfname) : - new CassandraRepairJob(this, cfname); + RepairJob job = new RepairJob(this, cfname); // Repairs can drive forward progress for consensus migration so always check job.addCallback(ConsensusTableMigration.completedRepairJobHandler); state.register(job.state); @@ -390,10 +396,10 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I public synchronized void terminate(@Nullable Throwable reason) { terminated = true; - List<AbstractRepairJob> jobs = this.jobs; + List<RepairJob> jobs = this.jobs; if (jobs != null) { - for (AbstractRepairJob job : jobs) + for (RepairJob job : jobs) job.abort(reason); } this.jobs = null; diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index f7b202dafb..e7ff9bc6c9 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -59,8 +59,7 @@ public class RepairOption public static final String IGNORE_UNREPLICATED_KS = "ignoreUnreplicatedKeyspaces"; public static final String REPAIR_PAXOS_KEY = "repairPaxos"; public static final String PAXOS_ONLY_KEY = "paxosOnly"; - - public static final String ACCORD_REPAIR_KEY = "accordRepair"; + public static final String ACCORD_ONLY_KEY = "accordOnly"; // we don't want to push nodes too much for repair public static final int MAX_JOB_THREADS = 4; @@ -199,21 +198,16 @@ public class RepairOption boolean ignoreUnreplicatedKeyspaces = Boolean.parseBoolean(options.get(IGNORE_UNREPLICATED_KS)); boolean repairPaxos = Boolean.parseBoolean(options.get(REPAIR_PAXOS_KEY)); boolean paxosOnly = Boolean.parseBoolean(options.get(PAXOS_ONLY_KEY)); - boolean accordRepair = Boolean.parseBoolean(options.get(ACCORD_REPAIR_KEY)); + boolean accordOnly = Boolean.parseBoolean(options.get(ACCORD_ONLY_KEY)); + + if (paxosOnly && accordOnly) + throw new IllegalArgumentException("Cannot repair paxos and repair only"); if (previewKind != PreviewKind.NONE) { Preconditions.checkArgument(!repairPaxos, "repairPaxos must be set to false for preview repairs"); Preconditions.checkArgument(!paxosOnly, "paxosOnly must be set to false for preview repairs"); - Preconditions.checkArgument(!accordRepair, "accordRepair must be set to false for preview repairs"); - } - - if (accordRepair) - { - Preconditions.checkArgument(!paxosOnly, "paxosOnly must be set to false for Accord repairs"); - Preconditions.checkArgument(previewKind == PreviewKind.NONE, "Can't perform preview repair with an Accord repair"); - Preconditions.checkArgument(!force, "Accord repair only requires a quorum to work so force is not supported"); - incremental = false; + Preconditions.checkArgument(!accordOnly, "accordOnly must be set to false for preview repairs"); } int jobThreads = 1; @@ -231,7 +225,7 @@ public class RepairOption boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY)); - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, pullRepair, force, previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordRepair); + RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, pullRepair, force, previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordOnly, false); // data centers String dataCentersStr = options.get(DATACENTERS_KEY); @@ -313,14 +307,15 @@ public class RepairOption private final boolean repairPaxos; private final boolean paxosOnly; - private final boolean accordRepair; + private final boolean accordOnly; + private final boolean isConsensusMigration; private final Collection<String> columnFamilies = new HashSet<>(); private final Collection<String> dataCenters = new HashSet<>(); private final Collection<String> hosts = new HashSet<>(); private final Collection<Range<Token>> ranges = new HashSet<>(); - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces, boolean repairPaxos, boolean paxosOnly, boolean accordRepair) + public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces, boolean repairPaxos, boolean paxosOnly, boolean accordOnly, boolean isConsensusMigration) { this.parallelism = parallelism; @@ -328,6 +323,7 @@ public class RepairOption this.incremental = incremental; this.trace = trace; this.jobThreads = jobThreads; + this.isConsensusMigration = isConsensusMigration; this.ranges.addAll(ranges); this.pullRepair = pullRepair; this.forceRepair = forceRepair; @@ -336,17 +332,7 @@ public class RepairOption this.ignoreUnreplicatedKeyspaces = ignoreUnreplicatedKeyspaces; this.repairPaxos = repairPaxos; this.paxosOnly = paxosOnly; - this.accordRepair = accordRepair; - } - - public RepairOption withAccordRepair(boolean accordRepair) - { - RepairOption repairOption = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, pullRepair, forceRepair, previewKind, optimiseStreams, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordRepair); - repairOption.columnFamilies.addAll(columnFamilies); - repairOption.dataCenters.addAll(dataCenters); - repairOption.hosts.addAll(hosts); - repairOption.ranges.addAll(ranges); - return repairOption; + this.accordOnly = accordOnly; } public RepairParallelism getParallelism() @@ -457,9 +443,14 @@ public class RepairOption return paxosOnly; } - public boolean accordRepair() + public boolean accordOnly() + { + return accordOnly; + } + + public boolean isConsensusMigration() { - return accordRepair; + return isConsensusMigration; } @Override @@ -481,7 +472,7 @@ public class RepairOption ", ignore unreplicated keyspaces: "+ ignoreUnreplicatedKeyspaces + ", repairPaxos: " + repairPaxos + ", paxosOnly: " + paxosOnly + - ", accordRepair: " + accordRepair + + ", accordOnly: " + accordOnly + ')'; } @@ -503,7 +494,7 @@ public class RepairOption options.put(OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams)); options.put(REPAIR_PAXOS_KEY, Boolean.toString(repairPaxos)); options.put(PAXOS_ONLY_KEY, Boolean.toString(paxosOnly)); - options.put(ACCORD_REPAIR_KEY, Boolean.toString(accordRepair)); + options.put(ACCORD_ONLY_KEY, Boolean.toString(accordOnly)); return options; } } diff --git a/src/java/org/apache/cassandra/repair/state/CoordinatorState.java b/src/java/org/apache/cassandra/repair/state/CoordinatorState.java index fbd184737e..35fbbf6bc7 100644 --- a/src/java/org/apache/cassandra/repair/state/CoordinatorState.java +++ b/src/java/org/apache/cassandra/repair/state/CoordinatorState.java @@ -77,10 +77,6 @@ public class CoordinatorState extends AbstractState<CoordinatorState.State, Time default: throw new AssertionError("Unknown preview kind: " + options.getPreviewKind()); } } - else if (options.accordRepair()) - { - return "accord repair"; - } else if (options.isIncremental()) { return "incremental"; diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java index 338b7dbc18..9838f38299 100644 --- a/src/java/org/apache/cassandra/schema/TableMetadata.java +++ b/src/java/org/apache/cassandra/schema/TableMetadata.java @@ -66,6 +66,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.service.accord.fastpath.FastPathStrategy; +import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.serialization.UDTAndFunctionsAwareMetadataSerializer; @@ -340,6 +341,12 @@ public class TableMetadata implements SchemaElement return isAccordEnabled() || migratingFromAccord(); } + public boolean supportsPaxosOperations() + { + return params.transactionalMode == TransactionalMode.off + || params.transactionalMigrationFrom.from == TransactionalMode.off; + } + public ImmutableCollection<ColumnMetadata> columns() { return columns.values(); diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 8fcee2610f..a2a89e1074 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -454,7 +454,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, - boolean accordRepair, + boolean accordOnly, + boolean isConsensusMigration, ExecutorPlus executor, Scheduler validationScheduler, String... cfnames) @@ -472,7 +473,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai range, excludedDeadNodes, keyspace, parallelismDegree, isIncremental, pullRepair, previewKind, optimiseStreams, repairPaxos, paxosOnly, - accordRepair, cfnames); + accordOnly, isConsensusMigration, cfnames); repairs.getIfPresent(parentRepairSession).register(session.state); sessions.put(session.getId(), session); diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 6e136029a0..54f8c369ff 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -20,10 +20,14 @@ package org.apache.cassandra.service.accord; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.LongSupplier; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; @@ -32,12 +36,18 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; +import accord.coordinate.Barrier; +import accord.coordinate.CoordinateSyncPoint; import accord.coordinate.TopologyMismatch; import accord.impl.CoordinateDurabilityScheduling; +import accord.primitives.SyncPoint; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.statements.RequestValidations; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.service.accord.repair.RepairSyncPointAdapter; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.service.accord.api.*; import org.apache.cassandra.utils.*; @@ -150,6 +160,12 @@ public class AccordService implements IAccordService, Shutdownable throw new UnsupportedOperationException("No accord barriers should be executed when accord.enabled = false in cassandra.yaml"); } + @Override + public long repair(@Nonnull Seekables keysOrRanges, long epoch, long queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> allEndpoints) + { + throw new UnsupportedOperationException("No accord repairs should be executed when accord.enabled = false in cassandra.yaml"); + } + @Override public @Nonnull TxnResult coordinate(@Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, @Nonnull long queryStartNanos) { @@ -287,7 +303,7 @@ public class AccordService implements IAccordService, Shutdownable { Invariants.checkState(localId != null, "static localId must be set before instantiating AccordService"); logger.info("Starting accord with nodeId {}", localId); - AccordAgent agent = new AccordAgent(); + AccordAgent agent = FBUtilities.construct(CassandraRelevantProperties.ACCORD_AGENT_CLASS.getString(AccordAgent.class.getName()), "AccordAgent"); this.configService = new AccordConfigurationService(localId); this.fastPathCoordinator = AccordFastPathCoordinator.create(localId, configService); this.messageSink = new AccordMessageSink(agent, configService); @@ -341,8 +357,7 @@ public class AccordService implements IAccordService, Shutdownable return requestHandler; } - @Override - public long barrier(@Nonnull Seekables keysOrRanges, long epoch, long queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite) + private <S extends Seekables<?, ?>> long barrier(@Nonnull S keysOrRanges, long epoch, long queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite, BiFunction<Node, S, AsyncResult<SyncPoint<S>>> syncPoint) { AccordClientRequestMetrics metrics = isForWrite ? accordWriteMetrics : accordReadMetrics; TxnId txnId = null; @@ -350,7 +365,9 @@ public class AccordService implements IAccordService, Shutdownable { logger.debug("Starting barrier key: {} epoch: {} barrierType: {} isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite); txnId = node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain()); - AsyncResult<Timestamp> asyncResult = node.barrier(keysOrRanges, epoch, barrierType); + AsyncResult<Timestamp> asyncResult = syncPoint == null + ? Barrier.barrier(node, keysOrRanges, epoch, barrierType) + : Barrier.barrier(node, keysOrRanges, epoch, barrierType, syncPoint); long deadlineNanos = queryStartNanos + timeoutNanos; Timestamp barrierExecuteAt = AsyncChains.getBlocking(asyncResult, deadlineNanos - nanoTime(), NANOSECONDS); logger.debug("Completed in {}ms barrier key: {} epoch: {} barrierType: {} isForWrite {}", @@ -394,6 +411,23 @@ public class AccordService implements IAccordService, Shutdownable } } + @Override + public long barrier(@Nonnull Seekables keysOrRanges, long epoch, long queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite) + { + return barrier(keysOrRanges, epoch, queryStartNanos, timeoutNanos, barrierType, isForWrite, null); + } + + public static <S extends Seekables<?, ?>> BiFunction<Node, S, AsyncResult<SyncPoint<S>>> repairSyncPoint(Set<Node.Id> allNodes) + { + return (node, seekables) -> CoordinateSyncPoint.coordinate(node, Kind.SyncPoint, seekables, RepairSyncPointAdapter.create(allNodes)); + } + + public long repair(@Nonnull Seekables keysOrRanges, long epoch, long queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> allEndpoints) + { + Set<Node.Id> allNodes = allEndpoints.stream().map(configService::mappedId).collect(Collectors.toUnmodifiableSet()); + return barrier(keysOrRanges, epoch, queryStartNanos, timeoutNanos, barrierType, isForWrite, repairSyncPoint(allNodes)); + } + private static ReadTimeoutException newBarrierTimeout(TxnId txnId, boolean global) { return new ReadTimeoutException(global ? ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString()); @@ -404,14 +438,13 @@ public class AccordService implements IAccordService, Shutdownable return new ReadPreemptedException(global ? ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString()); } - @Override - public long barrierWithRetries(Seekables keysOrRanges, long minEpoch, BarrierType barrierType, boolean isForWrite) throws InterruptedException + private long doWithRetries(LongSupplier action, int retryAttempts, long initialBackoffMillis, long maxBackoffMillis) throws InterruptedException { // Since we could end up having the barrier transaction or the transaction it listens to invalidated CoordinationFailed existingFailures = null; Long success = null; long backoffMillis = 0; - for (int attempt = 0; attempt < DatabaseDescriptor.getAccordBarrierRetryAttempts(); attempt++) + for (int attempt = 0; attempt < retryAttempts; attempt++) { try { @@ -423,10 +456,10 @@ public class AccordService implements IAccordService, Shutdownable e.addSuppressed(existingFailures); throw e; } - backoffMillis = backoffMillis == 0 ? DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis() : Math.min(backoffMillis * 2, DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis()); + backoffMillis = backoffMillis == 0 ? initialBackoffMillis : Math.min(backoffMillis * 2, maxBackoffMillis); try { - success = AccordService.instance().barrier(keysOrRanges, minEpoch, Clock.Global.nanoTime(), DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, isForWrite); + success = action.getAsLong(); break; } catch (CoordinationFailed newFailures) @@ -442,6 +475,24 @@ public class AccordService implements IAccordService, Shutdownable return success; } + @Override + public long barrierWithRetries(Seekables keysOrRanges, long minEpoch, BarrierType barrierType, boolean isForWrite) throws InterruptedException + { + return doWithRetries(() -> AccordService.instance().barrier(keysOrRanges, minEpoch, Clock.Global.nanoTime(), DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, isForWrite), + DatabaseDescriptor.getAccordBarrierRetryAttempts(), + DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(), + DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis()); + } + + @Override + public long repairWithRetries(Seekables keysOrRanges, long minEpoch, BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> allEndpoints) throws InterruptedException + { + return doWithRetries(() -> AccordService.instance().repair(keysOrRanges, minEpoch, Clock.Global.nanoTime(), DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, isForWrite, allEndpoints), + DatabaseDescriptor.getAccordBarrierRetryAttempts(), + DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(), + DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis()); + } + @Override public long currentEpoch() { diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index 42b957ba6a..ae0d08c48e 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -33,6 +33,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; @@ -60,6 +61,13 @@ public interface IAccordService long barrier(@Nonnull Seekables keysOrRanges, long minEpoch, long queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite); + default long repairWithRetries(Seekables keysOrRanges, long minEpoch, BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> allEndpoints) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + long repair(@Nonnull Seekables keysOrRanges, long epoch, long queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> allEndpoints); + default void postStreamReceivingBarrier(ColumnFamilyStore cfs, List<Range<Token>> ranges) { String ks = cfs.keyspace.getName(); diff --git a/src/java/org/apache/cassandra/repair/AccordRepairJob.java b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java similarity index 67% rename from src/java/org/apache/cassandra/repair/AccordRepairJob.java rename to src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java index d82e4407fa..924662b38d 100644 --- a/src/java/org/apache/cassandra/repair/AccordRepairJob.java +++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java @@ -16,12 +16,14 @@ * limitations under the License. */ -package org.apache.cassandra.repair; +package org.apache.cassandra.service.accord.repair; import java.math.BigInteger; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executor; import javax.annotation.Nullable; -import org.apache.cassandra.service.accord.AccordTopology; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,67 +31,88 @@ import accord.api.BarrierType; import accord.api.RoutingKey; import accord.primitives.Ranges; import accord.primitives.Seekables; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.dht.AccordSplitter; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.AccordTopology; import org.apache.cassandra.service.accord.TokenRange; -import org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; import static com.google.common.base.Preconditions.checkState; -import static java.util.Collections.emptyList; import static org.apache.cassandra.config.CassandraRelevantProperties.ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL; /* * Accord repair consists of creating a barrier transaction for all the ranges which ensure that all Accord transactions * before the Epoch and point in time at which the repair started have their side effects visible to Paxos and regular quorum reads. */ -public class AccordRepairJob extends AbstractRepairJob +public class AccordRepair { - private static final Logger logger = LoggerFactory.getLogger(AccordRepairJob.class); + private static final Logger logger = LoggerFactory.getLogger(AccordRepair.class); public static final BigInteger TWO = BigInteger.valueOf(2); + private final SharedContext ctx; + private final ColumnFamilyStore cfs; + private final Ranges ranges; private final AccordSplitter splitter; + private final boolean requireAllEndpoints; + private final List<InetAddressAndPort> endpoints; private BigInteger rangeStep; - private Epoch minEpoch = ClusterMetadata.current().epoch; + private final Epoch minEpoch = ClusterMetadata.current().epoch; private volatile Throwable shouldAbort = null; - public AccordRepairJob(RepairSession repairSession, String cfname) + public AccordRepair(SharedContext ctx, ColumnFamilyStore cfs, IPartitioner partitioner, String keyspace, Collection<Range<Token>> ranges, boolean requireAllEndpoints, List<InetAddressAndPort> endpoints) { - super(repairSession, cfname); - IPartitioner partitioner = desc.ranges.iterator().next().left.getPartitioner(); - this.ranges = AccordTopology.toAccordRanges(desc.keyspace, desc.ranges); - this.splitter = partitioner.accordSplitter().apply(ranges); + this.ctx = ctx; + this.cfs = cfs; + this.requireAllEndpoints = requireAllEndpoints; + this.endpoints = endpoints; + this.ranges = AccordTopology.toAccordRanges(keyspace, ranges); + this.splitter = partitioner.accordSplitter().apply(this.ranges); } - @Override - protected void runRepair() + public Epoch minEpoch() { - try - { - for (accord.primitives.Range range : ranges) - repairRange((TokenRange)range); - state.phase.success(); - cfs.metric.repairsCompleted.inc(); - trySuccess(new RepairResult(desc, emptyList(), ConsensusMigrationRepairResult.fromAccordRepair(minEpoch))); - } - catch (Throwable t) - { - state.phase.fail(t); - cfs.metric.repairsCompleted.inc(); - tryFailure(t); - } + return minEpoch; } - @Override - void abort(@Nullable Throwable reason) + public void repair() throws Throwable + { + for (accord.primitives.Range range : ranges) + repairRange((TokenRange)range); + } + + public Future<Void> repair(Executor executor) + { + AsyncPromise<Void> future = new AsyncPromise<>(); + executor.execute(() -> { + try + { + repair(); + future.trySuccess(null); + } + catch (Throwable e) + { + future.tryFailure(e); + } + }); + return future; + } + + protected void abort(@Nullable Throwable reason) { shouldAbort = reason == null ? new RuntimeException("Abort") : reason; } @@ -145,25 +168,32 @@ public class AccordRepairJob extends AbstractRepairJob checkState(!toRepair.equals(lastRepaired), "Shouldn't repair the same range twice"); checkState(lastRepaired == null || toRepair.start().equals(lastRepaired.end()), "Next range should directly follow previous range"); lastRepaired = toRepair; - AccordService.instance().barrierWithRetries(Seekables.of(toRepair), minEpoch.getEpoch(), BarrierType.global_sync, false); + + if (requireAllEndpoints) + { + AccordService.instance().repairWithRetries(Seekables.of(toRepair), minEpoch.getEpoch(), BarrierType.global_sync, false, endpoints); + } + else + { + AccordService.instance().barrierWithRetries(Seekables.of(toRepair), minEpoch.getEpoch(), BarrierType.global_sync, false); + } + remainingStart = toRepair.end(); } catch (RuntimeException e) { - // TODO Placeholder for dependency limit overflow -// dependencyOverflow = true; - cfs.metric.rangeMigrationDependencyLimitFailures.mark(); + cfs.metric.accordRepairUnexpectedFailures.mark(); throw e; } catch (Throwable t) { - // unexpected error - cfs.metric.rangeMigrationUnexpectedFailures.mark(); + cfs.metric.accordRepairUnexpectedFailures.mark(); throw new RuntimeException(t); } finally { - cfs.metric.rangeMigration.addNano(start); + long end = ctx.clock().nanoTime(); + cfs.metric.accordRepair.addNano(end - start); } // TODO when dependency limits are added to Accord need to test repair overflow diff --git a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java new file mode 100644 index 0000000000..76e29adbc5 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord.repair; + +import java.util.Collection; +import java.util.function.BiConsumer; + +import com.google.common.collect.ImmutableSet; + +import accord.api.Result; +import accord.coordinate.CoordinationAdapter; +import accord.coordinate.ExecutePath; +import accord.coordinate.ExecuteSyncPoint; +import accord.local.Node; +import accord.primitives.Deps; +import accord.primitives.FullRoute; +import accord.primitives.Seekables; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.primitives.Writes; +import accord.topology.Topologies; + +/** + * Sync point adapter used for accord-only repairs. + * + * Repair has the requirement that all client writes begun before the repair will be fully replicated once repair + * has completed. In the case of accord, repairs that compare data on disk satisfy this requirement by running + * a sync point as part of streaming if differences are found. For accord-only repairs, the barrier used by normal + * repairs is not sufficient since it only requires a quorum of nodes to respond before completing. This sync point + * adapter requires responses from all of the supplied endpoints before completing. Note that shards only block on the + * intersection of the provided replicas and their own endpoints. + */ +public class RepairSyncPointAdapter<S extends Seekables<?, ?>> extends CoordinationAdapter.Adapters.AbstractSyncPointAdapter<S> +{ + private final ImmutableSet<Node.Id> requiredResponses; + + public RepairSyncPointAdapter(Collection<Node.Id> requiredResponses) + { + this.requiredResponses = ImmutableSet.copyOf(requiredResponses); + } + + @Override + public void execute(Node node, Topologies all, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<S>, Throwable> callback) + { + RequiredResponseTracker tracker = new RequiredResponseTracker(requiredResponses, all); + ExecuteSyncPoint.ExecuteBlocking<S> execute = new ExecuteSyncPoint.ExecuteBlocking<>(node, tracker, new SyncPoint<>(txnId, deps, (S) txn.keys(), route), executeAt); + execute.addCallback(callback); + execute.start(); + } + + @Override + public void persist(Node node, Topologies all, FullRoute<?> route, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super SyncPoint<S>, Throwable> callback) + { + throw new UnsupportedOperationException(); + } + + public static <S extends Seekables<?, ?>> CoordinationAdapter<SyncPoint<S>> create(Collection<Node.Id> requiredResponses) + { + return new RepairSyncPointAdapter<>(requiredResponses); + } +} diff --git a/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java b/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java new file mode 100644 index 0000000000..130e914969 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord.repair; + +import java.util.HashSet; +import java.util.Set; + +import accord.coordinate.tracking.AbstractSimpleTracker; +import accord.coordinate.tracking.RequestStatus; +import accord.coordinate.tracking.ShardTracker; +import accord.local.Node; +import accord.topology.Shard; +import accord.topology.Topologies; + +import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail; +import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange; +import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success; + +public class RequiredResponseTracker extends AbstractSimpleTracker<RequiredResponseTracker.RequiredResponseShardTracker> +{ + public static class RequiredResponseShardTracker extends ShardTracker + { + private final Set<Node.Id> outstandingResponses; + + public RequiredResponseShardTracker(Set<Node.Id> requiredResponses, Shard shard) + { + super(shard); + this.outstandingResponses = new HashSet<>(); + for (Node.Id id : shard.nodes) + { + if (requiredResponses.contains(id)) + outstandingResponses.add(id); + } + } + + public ShardOutcomes onSuccess(Node.Id node) + { + return outstandingResponses.remove(node) && outstandingResponses.isEmpty() ? Success : NoChange; + } + + public ShardOutcomes onFailure(Object ignore) + { + return !outstandingResponses.isEmpty() ? Fail : NoChange; + } + } + + public RequiredResponseTracker(Set<Node.Id> requiredResponses, Topologies topologies) + { + super(topologies, RequiredResponseShardTracker[]::new, shard -> new RequiredResponseShardTracker(requiredResponses, shard)); + } + + @Override + public RequestStatus recordSuccess(Node.Id node) + { + return recordResponse(this, node, RequiredResponseShardTracker::onSuccess, node); + } + + @Override + public RequestStatus recordFailure(Node.Id node) + { + return recordResponse(this, node, RequiredResponseShardTracker::onFailure, null); + } +} diff --git a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java index 1b48a53554..163e8f65f2 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java @@ -82,6 +82,7 @@ public class ReadDataSerializers CommandSerializers.txnId.serialize(msg.txnId, out, version); KeySerializers.participants.serialize(msg.readScope, out, version); out.writeUnsignedVInt(msg.executeAtEpoch); + CommandSerializers.timestamp.serialize(msg.executeAt, out, version); KeySerializers.fullRoute.serialize(msg.route, out, version); CommandSerializers.partialTxn.serialize(msg.txn, out, version); DepsSerializer.partialDeps.serialize(msg.deps, out, version); @@ -97,6 +98,7 @@ public class ReadDataSerializers CommandSerializers.txnId.deserialize(in, version), KeySerializers.participants.deserialize(in, version), in.readUnsignedVInt(), + CommandSerializers.timestamp.deserialize(in, version), KeySerializers.fullRoute.deserialize(in, version), CommandSerializers.partialTxn.deserialize(in, version), DepsSerializer.partialDeps.deserialize(in, version), @@ -111,6 +113,7 @@ public class ReadDataSerializers return CommandSerializers.txnId.serializedSize(msg.txnId, version) + KeySerializers.participants.serializedSize(msg.readScope, version) + TypeSizes.sizeofUnsignedVInt(msg.executeAtEpoch) + + CommandSerializers.timestamp.serializedSize(msg.executeAt, version) + KeySerializers.fullRoute.serializedSize(msg.route, version) + CommandSerializers.partialTxn.serializedSize(msg.txn, version) + DepsSerializer.partialDeps.serializedSize(msg.deps, version) diff --git a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairResult.java b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairResult.java index 9233667b5a..2215c734c8 100644 --- a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairResult.java +++ b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairResult.java @@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkArgument; public class ConsensusMigrationRepairResult { + private static final ConsensusMigrationRepairResult INELIGIBLE = new ConsensusMigrationRepairResult(ConsensusMigrationRepairType.ineligible, Epoch.EMPTY); public final ConsensusMigrationRepairType type; public final Epoch minEpoch; @@ -33,18 +34,24 @@ public class ConsensusMigrationRepairResult this.minEpoch = minEpoch; } - public static ConsensusMigrationRepairResult fromCassandraRepair(Epoch minEpoch, boolean migrationEligibleRepair) + public static ConsensusMigrationRepairResult fromRepair(Epoch minEpoch, boolean paxosAndDataRepaired, boolean accordRepaired, boolean deadNodesExcluded) { - checkArgument(!migrationEligibleRepair || minEpoch.isAfter(Epoch.EMPTY), "Epoch should not be empty if Paxos and regular repairs were performed"); - if (migrationEligibleRepair) - return new ConsensusMigrationRepairResult(ConsensusMigrationRepairType.paxos, minEpoch); - else - return new ConsensusMigrationRepairResult(ConsensusMigrationRepairType.ineligible, Epoch.EMPTY); + checkArgument((!paxosAndDataRepaired && !accordRepaired) || minEpoch.isAfter(Epoch.EMPTY), "Epoch should not be empty if Paxos and regular repairs were performed"); + + if (deadNodesExcluded) return INELIGIBLE; + if (paxosAndDataRepaired && accordRepaired) return new ConsensusMigrationRepairResult(ConsensusMigrationRepairType.either, minEpoch); + if (paxosAndDataRepaired) return new ConsensusMigrationRepairResult(ConsensusMigrationRepairType.paxos, minEpoch); + if (accordRepaired) return new ConsensusMigrationRepairResult(ConsensusMigrationRepairType.accord, minEpoch); + return INELIGIBLE; + } + + public static ConsensusMigrationRepairResult fromPaxosOnlyRepair(Epoch minEpoch, boolean deadNodesExcluded) + { + return fromRepair(minEpoch, false, false, deadNodesExcluded); } - public static ConsensusMigrationRepairResult fromAccordRepair(Epoch minEpoch) + public static ConsensusMigrationRepairResult fromAccordOnlyRepair(Epoch minEpoch, boolean deadNodesExcluded) { - checkArgument(minEpoch.isAfter(Epoch.EMPTY), "Accord repairs should always occur at an Epoch"); - return new ConsensusMigrationRepairResult(ConsensusMigrationRepairType.accord, minEpoch); + return fromRepair(minEpoch, false, true, deadNodesExcluded); } } diff --git a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairType.java b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairType.java index 694b7fcd44..7bce208739 100644 --- a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairType.java +++ b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairType.java @@ -24,7 +24,8 @@ public enum ConsensusMigrationRepairType { ineligible(0), paxos(1), - accord(2); + accord(2), + either(3); public final byte value; @@ -50,6 +51,8 @@ public enum ConsensusMigrationRepairType return ConsensusMigrationRepairType.paxos; case 2: return ConsensusMigrationRepairType.accord; + case 3: + return ConsensusMigrationRepairType.either; } } } diff --git a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationTarget.java b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationTarget.java index 534ad1e99e..45c2e5fa12 100644 --- a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationTarget.java +++ b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationTarget.java @@ -34,6 +34,17 @@ public enum ConsensusMigrationTarget this.value = SignedBytes.checkedCast(value); } + public boolean isMigratedBy(ConsensusMigrationRepairType repairType) + { + switch (repairType) + { + case either: return true; + case paxos: return this == accord; + case accord: return this == paxos; + default: return false; + } + } + public static ConsensusMigrationTarget fromString(String targetProtocol) { return ConsensusMigrationTarget.valueOf(targetProtocol.toLowerCase()); diff --git a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusTableMigration.java b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusTableMigration.java index 3f25922a3e..ae6e2f0cb0 100644 --- a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusTableMigration.java +++ b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusTableMigration.java @@ -90,9 +90,7 @@ public abstract class ConsensusTableMigration if (tms == null || !Range.intersects(tms.migratingRanges, desc.ranges)) return; - if (tms.targetProtocol == ConsensusMigrationTarget.paxos && repairResult.consensusMigrationRepairResult.type != ConsensusMigrationRepairType.accord) - return; - if (tms.targetProtocol == ConsensusMigrationTarget.accord && repairResult.consensusMigrationRepairResult.type != ConsensusMigrationRepairType.paxos) + if (!tms.targetProtocol.isMigratedBy(repairResult.consensusMigrationRepairResult.type)) return; ClusterMetadataService.instance().commit( @@ -310,7 +308,8 @@ public abstract class ConsensusTableMigration boolean ignoreUnreplicatedKeyspaces = true; boolean repairPaxos = !accordRepair; boolean paxosOnly = false; - RepairOption repairOption = new RepairOption(RepairParallelism.PARALLEL, primaryRange, incremental, trace, numJobThreads, intersectingRanges, pullRepair, forceRepair, PreviewKind.NONE, optimiseStreams, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordRepair); + boolean accordOnly = false; + RepairOption repairOption = new RepairOption(RepairParallelism.PARALLEL, primaryRange, incremental, trace, numJobThreads, intersectingRanges, pullRepair, forceRepair, PreviewKind.NONE, optimiseStreams, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordOnly, true); tables.forEach(table -> repairOption.getColumnFamilies().add(table.tableName)); return repairOption; } diff --git a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java index 7dff0111ef..1c1f3985fa 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java +++ b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java @@ -38,7 +38,6 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableParams; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairType; -import org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget; import org.apache.cassandra.service.consensus.migration.ConsensusTableMigration; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.service.consensus.migration.TableMigrationState; @@ -138,11 +137,8 @@ public class MaybeFinishConsensusMigrationForTableAndRange implements Transforma if (tms == null) return new Rejected(INVALID, format("Table %s is not currently performing consensus migration", ksAndCF)); - if (tms.targetProtocol == ConsensusMigrationTarget.accord && repairType != ConsensusMigrationRepairType.paxos) - return new Rejected(INVALID, format("Table %s is not currently performing consensus migration to Accord and the repair was a Paxos repair", ksAndCF)); - - if (tms.targetProtocol == ConsensusMigrationTarget.paxos && repairType != ConsensusMigrationRepairType.accord) - return new Rejected(INVALID, format("Table %s is not currently performing consensus migration to Paxos and the repair was an Accord repair", ksAndCF)); + if (!tms.targetProtocol.isMigratedBy(repairType)) + return new Rejected(INVALID, format("Table %s is not currently performing consensus migration to %s and the repair was a %s repair", ksAndCF, tms.targetProtocol, repairType)); List<Range<Token>> normalizedRepairedRanges = normalize(repairedRanges); diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java index ff7eace231..d5e24caa57 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java @@ -105,6 +105,9 @@ public class Repair extends NodeToolCmd @Option(title = "paxos-only", name = {"-paxos-only", "--paxos-only"}, description = "If the --paxos-only flag is included, no table data is repaired, only paxos operations..") private boolean paxosOnly = false; + @Option(title = "accord-only", name = {"-accord-only", "--accord-only"}, description = "If the --accord-only flag is included, no table data is repaired, only accord operations..") + private boolean accordOnly = false; + @Option(title = "ignore_unreplicated_keyspaces", name = {"-iuk","--ignore-unreplicated-keyspaces"}, description = "Use --ignore-unreplicated-keyspaces to ignore keyspaces which are not replicated, otherwise the repair will fail") private boolean ignoreUnreplicatedKeyspaces = false; @@ -187,6 +190,7 @@ public class Repair extends NodeToolCmd options.put(RepairOption.IGNORE_UNREPLICATED_KS, Boolean.toString(ignoreUnreplicatedKeyspaces)); options.put(RepairOption.REPAIR_PAXOS_KEY, Boolean.toString(!skipPaxos && getPreviewKind() == PreviewKind.NONE)); options.put(RepairOption.PAXOS_ONLY_KEY, Boolean.toString(paxosOnly && getPreviewKind() == PreviewKind.NONE)); + options.put(RepairOption.ACCORD_ONLY_KEY, Boolean.toString(accordOnly && getPreviewKind() == PreviewKind.NONE)); if (!startToken.isEmpty() || !endToken.isEmpty()) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java index edb749fb21..cec5fdb2e5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java @@ -44,7 +44,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.NodeToolResult; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.AsymmetricRemoteSyncTask; -import org.apache.cassandra.repair.CassandraRepairJob; +import org.apache.cassandra.repair.RepairJob; import org.apache.cassandra.repair.LocalSyncTask; import org.apache.cassandra.repair.SyncTask; import org.apache.cassandra.repair.TreeResponse; @@ -107,7 +107,7 @@ public class OptimiseStreamsRepairTest extends TestBaseImpl { public static void install(ClassLoader cl, int id) { - new ByteBuddy().rebase(CassandraRepairJob.class) + new ByteBuddy().rebase(RepairJob.class) .method(named("createOptimisedSyncingSyncTasks").and(takesArguments(1))) .intercept(MethodDelegation.to(BBHelper.class)) .make() diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java new file mode 100644 index 0000000000..66cf1c0e00 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.local.CommandsForKey; +import accord.impl.SimpleProgressLog; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.SafeCommand; +import accord.local.Status; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.async.AsyncChains; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.AccordSafeCommandStore; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.api.AccordAgent; +import org.apache.cassandra.service.accord.api.PartitionKey; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; + +import static java.lang.String.format; + +public class AccordIncrementalRepairTest extends AccordTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(AccordIncrementalRepairTest.class); + + public static class BarrierRecordingAgent extends AccordAgent + { + static class ExecutedBarrier + { + final Seekables<?, ?> keysOrRanges; + final @Nonnull Timestamp executeAt; + + public ExecutedBarrier(Seekables<?, ?> keysOrRanges, @Nonnull Timestamp executeAt) + { + this.keysOrRanges = keysOrRanges; + this.executeAt = executeAt; + } + + @Override + public String toString() + { + return "ExecutedBarrier{" + + "keysOrRanges=" + keysOrRanges + + ", executeAt=" + executeAt + + '}'; + } + } + + private final List<ExecutedBarrier> barriers = new ArrayList<>(); + + @Override + public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull Timestamp executeAt) + { + super.onLocalBarrier(keysOrRanges, executeAt); + synchronized (barriers) + { + barriers.add(new ExecutedBarrier(keysOrRanges, executeAt)); + } + } + + public List<ExecutedBarrier> executedBarriers() + { + synchronized (barriers) + { + return ImmutableList.copyOf(barriers); + } + } + + public void reset() + { + synchronized (barriers) + { + barriers.clear(); + } + } + + } + + static BarrierRecordingAgent agent() + { + AccordService service = (AccordService) AccordService.instance(); + return (BarrierRecordingAgent) service.node().agent(); + } + + static AccordService accordService() + { + return (AccordService) AccordService.instance(); + } + + @Override + protected Logger logger() + { + return logger; + } + + @BeforeClass + public static void setupClass() throws Throwable + { + CassandraRelevantProperties.ACCORD_AGENT_CLASS.setString(BarrierRecordingAgent.class.getName()); +// setupCluster(opt -> opt.withConfig(conf -> conf.with(Feature.NETWORK, Feature.GOSSIP)), 3); + setupCluster(opt -> opt, 3); + } + + @After + public void tearDown() + { + SHARED_CLUSTER.filters().reset(); + } + + private static void await(IInvokableInstance instance, IIsolatedExecutor.SerializableCallable<Boolean> check, long duration, TimeUnit unit) + { + instance.runOnInstance(() -> { + long timeout = Clock.Global.currentTimeMillis() + unit.toMillis(duration); + while (Clock.Global.currentTimeMillis() < timeout) + { + if (check.call()) + return; + + try + { + Thread.sleep(1); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } + throw new AssertionError("Timed out waiting for node 3 to become alive"); + }); + } + + private static void awaitEndpointUp(IInvokableInstance instance, IInvokableInstance waitOn) + { + InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(waitOn.broadcastAddress()); + await(instance, () -> FailureDetector.instance.isAlive(endpoint), 1, TimeUnit.MINUTES); + } + + private static void awaitEndpointDown(IInvokableInstance instance, IInvokableInstance waitOn) + { + InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(waitOn.broadcastAddress()); + await(instance, () -> !FailureDetector.instance.isAlive(endpoint), 1, TimeUnit.MINUTES); + } + + private static <V> V getUninterruptibly(Future<V> future, long timeout, TimeUnit units) + { + try + { + return future.get(timeout, units); + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(e); + } + catch (ExecutionException | TimeoutException e) + { + throw new RuntimeException(e); + } + } + + private static <V> V getUninterruptibly(Future<V> future) + { + return getUninterruptibly(future, 1, TimeUnit.MINUTES); + } + + private static TxnId awaitLocalApplyOnKey(PartitionKey key) + { + Node node = accordService().node(); + AtomicReference<TxnId> waitFor = new AtomicReference<>(null); + AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(key), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> { + AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore; + CommandsForKey commands = store.maybeCommandsForKey(key).current(); + int size = commands.size(); + if (size < 1) + return; + // if txnId is an instance of CommandsForKey.TxnInfo, copying it into a + // new txnId instance will prevent any issues related to TxnInfo#hashCode + waitFor.set(new TxnId(commands.txnId(size - 1))); + })); + Assert.assertNotNull(waitFor.get()); + TxnId txnId = waitFor.get(); + long start = Clock.Global.currentTimeMillis(); + AtomicBoolean applied = new AtomicBoolean(false); + while (!applied.get()) + { + long now = Clock.Global.currentTimeMillis(); + if (now - start > TimeUnit.MINUTES.toMillis(1)) + throw new AssertionError("Timeout"); + AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> { + SafeCommand command = safeStore.get(txnId, key.toUnseekable()); + Assert.assertNotNull(command.current()); + if (command.current().status().hasBeen(Status.Applied)) + applied.set(true); + })); + } + return txnId; + } + + @Test + public void txnRepairTest() throws Throwable + { + SHARED_CLUSTER.schemaChange(format("CREATE TABLE %s.%s (k int primary key, v int) WITH transactional_mode='full' AND fast_path={'size':2};", KEYSPACE, tableName)); + final String keyspace = KEYSPACE; + final String table = tableName; + + SHARED_CLUSTER.filters().allVerbs().to(3).drop(); + awaitEndpointDown(SHARED_CLUSTER.get(1), SHARED_CLUSTER.get(3)); + + executeWithRetry(SHARED_CLUSTER, format("BEGIN TRANSACTION\n" + + "INSERT INTO %s (k, v) VALUES (1, 1);\n" + + "COMMIT TRANSACTION", qualifiedTableName)); + + SHARED_CLUSTER.get(1, 2).forEach(instance -> instance.runOnInstance(() -> { + TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table); + awaitLocalApplyOnKey(new PartitionKey(metadata.id, metadata.partitioner.decorateKey(ByteBufferUtil.bytes(1)))); + })); + + SHARED_CLUSTER.forEach(instance -> instance.runOnInstance(() -> agent().reset())); + + SHARED_CLUSTER.get(1, 2).forEach(instance -> { + instance.runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + Assert.assertFalse(cfs.getLiveSSTables().isEmpty()); + cfs.getLiveSSTables().forEach(sstable -> { + Assert.assertFalse(sstable.isRepaired()); + Assert.assertFalse(sstable.isPendingRepair()); + }); + }); + }); + SHARED_CLUSTER.get(3).runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + Assert.assertTrue(cfs.getLiveSSTables().isEmpty()); + }); + + // heal partition and wait for node 1 to see node 3 again + for (IInvokableInstance instance : SHARED_CLUSTER) + instance.runOnInstance(() -> { + SimpleProgressLog.PAUSE_FOR_TEST = true; + Assert.assertTrue(agent().executedBarriers().isEmpty()); + }); + SHARED_CLUSTER.filters().reset(); + awaitEndpointUp(SHARED_CLUSTER.get(1), SHARED_CLUSTER.get(3)); + SHARED_CLUSTER.get(1).nodetool("repair", KEYSPACE); + + SHARED_CLUSTER.forEach(instance -> { + instance.runOnInstance(() -> { + Assert.assertFalse( agent().executedBarriers().isEmpty()); + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + Assert.assertFalse(cfs.getLiveSSTables().isEmpty()); + cfs.getLiveSSTables().forEach(sstable -> { + Assert.assertTrue(sstable.isRepaired() || sstable.isPendingRepair()); + }); + }); + }); + } + + private void testSingleNodeWrite(TransactionalMode mode) + { + SHARED_CLUSTER.schemaChange(format("CREATE TABLE %s.%s (k int primary key, v int) WITH transactional_mode='%s';", KEYSPACE, tableName, mode)); + final String keyspace = KEYSPACE; + final String table = tableName; + + SHARED_CLUSTER.get(3).runOnInstance(() -> { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 2);", keyspace, table)); + }); + + SHARED_CLUSTER.get(3).runOnInstance(() -> { + UntypedResultSet result = QueryProcessor.executeInternal(format("SELECT * FROM %s.%s WHERE k=1", keyspace, table)); + Assert.assertFalse(result.isEmpty()); + UntypedResultSet.Row row = Iterables.getOnlyElement(result); + Assert.assertEquals(1, row.getInt("k")); + Assert.assertEquals(2, row.getInt("v")); + + + + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + Assert.assertFalse(cfs.getLiveSSTables().isEmpty()); + cfs.getLiveSSTables().forEach(sstable -> { + Assert.assertFalse(sstable.isRepaired()); + Assert.assertFalse(sstable.isPendingRepair()); + }); + }); + SHARED_CLUSTER.get(1, 2).forEach(instance -> instance.runOnInstance(() -> { + UntypedResultSet result = QueryProcessor.executeInternal(format("SELECT * FROM %s.%s WHERE k=1", keyspace, table)); + Assert.assertTrue(result.isEmpty()); + + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + Assert.assertTrue(cfs.getLiveSSTables().isEmpty()); + })); + SHARED_CLUSTER.forEach(instance -> instance.runOnInstance(() -> { + agent().reset(); + })); + + SHARED_CLUSTER.get(1).nodetool("repair", KEYSPACE); + SHARED_CLUSTER.forEach(instance -> instance.runOnInstance(() -> { + Assert.assertFalse( agent().executedBarriers().isEmpty()); + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + Assert.assertFalse(cfs.getLiveSSTables().isEmpty()); + cfs.getLiveSSTables().forEach(sstable -> { + Assert.assertTrue(sstable.isRepaired() || sstable.isPendingRepair()); + }); + + UntypedResultSet result = QueryProcessor.executeInternal(format("SELECT * FROM %s.%s WHERE k=1", keyspace, table)); + Assert.assertFalse(result.isEmpty()); + UntypedResultSet.Row row = Iterables.getOnlyElement(result); + Assert.assertEquals(1, row.getInt("k")); + Assert.assertEquals(2, row.getInt("v")); + })); + } + + /** + * a failed write at txn mode unsafe should be made visible by repair + */ + @Test + public void unsafeRepairTest() + { + testSingleNodeWrite(TransactionalMode.unsafe); + } + + /** + * Repair should repair (fully replicate _some_ state) any divergent state between replicas + */ + @Test + public void fullRepairTest() + { + testSingleNodeWrite(TransactionalMode.full); + } + + @Test + public void onlyAccordTest() + { + SHARED_CLUSTER.schemaChange(format("CREATE TABLE %s.%s (k int primary key, v int) WITH transactional_mode='full' AND fast_path={'size':2};", KEYSPACE, tableName)); + final String keyspace = KEYSPACE; + final String table = tableName; + + SHARED_CLUSTER.filters().allVerbs().to(3).drop(); + awaitEndpointDown(SHARED_CLUSTER.get(1), SHARED_CLUSTER.get(3)); + awaitEndpointDown(SHARED_CLUSTER.get(2), SHARED_CLUSTER.get(3)); + + executeWithRetry(SHARED_CLUSTER, format("BEGIN TRANSACTION\n" + + "INSERT INTO %s (k, v) VALUES (1, 1);\n" + + "COMMIT TRANSACTION", qualifiedTableName)); + + SHARED_CLUSTER.get(1, 2).forEach(instance -> instance.runOnInstance(() -> { + TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table); + awaitLocalApplyOnKey(new PartitionKey(metadata.id, metadata.partitioner.decorateKey(ByteBufferUtil.bytes(1)))); + })); + + SHARED_CLUSTER.forEach(instance -> instance.runOnInstance(() -> agent().reset())); + + SHARED_CLUSTER.filters().reset(); + awaitEndpointUp(SHARED_CLUSTER.get(1), SHARED_CLUSTER.get(3)); + SHARED_CLUSTER.get(1).nodetool("repair", "--accord-only", KEYSPACE); + + SHARED_CLUSTER.forEach(instance -> { + logger().info("checking instance {}", instance.broadcastAddress()); + instance.runOnInstance(() -> { + Assert.assertFalse( agent().executedBarriers().isEmpty()); + }); + }); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java index f568d982f4..519ea15f70 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java @@ -570,7 +570,7 @@ public class AccordMigrationTest extends AccordTestBase assertTargetPaxosWrite(runCasNoApply, 1, paxosMigratingKeys.next(), 2, 1, 1, 1, 1); // Repair the currently migrating range from when targets were switched, but it's not an Accord repair, this is to make sure the wrong repair type doesn't trigger progress - nodetool(coordinator, "repair", "-st", upperMidToken.toString(), "-et", maxAlignedWithLocalRanges.toString()); + nodetool(coordinator, "repair", "-st", upperMidToken.toString(), "-et", maxAlignedWithLocalRanges.toString(), "--paxos-only"); assertMigrationState(tableName, ConsensusMigrationTarget.paxos, ImmutableList.of(new Range(minToken, midToken), new Range(maxToken, minToken)), ImmutableList.of(accordMigratingRange), 1); // Paxos migrating keys should still need key migration after non-Accord repair diff --git a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java index b04df4b154..bf75f920ed 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java +++ b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java @@ -97,7 +97,7 @@ class OnInstanceRepair extends ClusterAction { Collection<Range<Token>> ranges = rangesSupplier.call(); // no need to wait for completion, as we track all task submissions and message exchanges, and ensure they finish before continuing to next action - StorageService.instance.repair(keyspaceName, new RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, ranges, false, force, PreviewKind.NONE, false, true, repairPaxos, repairOnlyPaxos, false), singletonList((tag, event) -> { + StorageService.instance.repair(keyspaceName, new RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, ranges, false, force, PreviewKind.NONE, false, true, repairPaxos, repairOnlyPaxos, false, false), singletonList((tag, event) -> { if (event.getType() == ProgressEventType.COMPLETE) listener.run(); })); diff --git a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java index 6caec1e8ec..27152901c3 100644 --- a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java +++ b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java @@ -66,12 +66,7 @@ public class FailingRepairFuzzTest extends FuzzTestBase Cluster.Node coordinator = coordinatorGen.next(rs); // exclude accord repair as this test breaks validation/sync; which accord doesn't have - RepairOption options; - do - { - options = repairOption(rs, coordinator, KEYSPACE, TABLES); - } - while (options.accordRepair()); + RepairOption options = repairOption(rs, coordinator, KEYSPACE, TABLES); RepairCoordinator repair = coordinator.repair(KEYSPACE, options, false); repair.run(); InetAddressAndPort failingAddress = pickParticipant(rs, coordinator, repair); diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java index ea57c1b790..3cf67a6024 100644 --- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java +++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java @@ -382,26 +382,14 @@ public abstract class FuzzTestBase extends CQLTester.InMemory for (JobState job : session.getJobs()) { EnumSet<JobState.State> expected = EnumSet.allOf(JobState.State.class); - if (repair.state.options.accordRepair()) + if (!shouldSnapshot) { - // accord doesn't do snapshot, validation, or streaming expected.remove(JobState.State.SNAPSHOT_START); expected.remove(JobState.State.SNAPSHOT_COMPLETE); - expected.remove(JobState.State.VALIDATION_START); - expected.remove(JobState.State.VALIDATION_COMPLETE); - expected.remove(JobState.State.STREAM_START); } - else + if (!shouldSync) { - if (!shouldSnapshot) - { - expected.remove(JobState.State.SNAPSHOT_START); - expected.remove(JobState.State.SNAPSHOT_COMPLETE); - } - if (!shouldSync) - { - expected.remove(JobState.State.STREAM_START); - } + expected.remove(JobState.State.STREAM_START); } Set<JobState.State> actual = job.getStateTimesMillis().keySet(); Assertions.assertThat(actual).isEqualTo(expected); diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java index 1c26105909..2832368ecb 100644 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -113,7 +113,7 @@ public class RepairJobTest private static InetAddressAndPort addr4; private static InetAddressAndPort addr5; private MeasureableRepairSession session; - private CassandraRepairJob job; + private RepairJob job; private RepairJobDesc sessionJobDesc; // So that threads actually get recycled and we can have accurate memory accounting while testing @@ -125,11 +125,11 @@ public class RepairJobTest public MeasureableRepairSession(TimeUUID parentRepairSession, CommonRange commonRange, boolean excludedDeadNodes, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, PreviewKind previewKind, boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, - boolean accordRepair, String... cfnames) + String... cfnames) { super(SharedContext.Global.instance, new Scheduler.NoopScheduler(), parentRepairSession, commonRange, excludedDeadNodes, keyspace, parallelismDegree, isIncremental, pullRepair, - previewKind, optimiseStreams, repairPaxos, paxosOnly, accordRepair, cfnames); + previewKind, optimiseStreams, repairPaxos, paxosOnly, false, false, cfnames); } @Override @@ -195,9 +195,9 @@ public class RepairJobTest this.session = new MeasureableRepairSession(parentRepairSession, new CommonRange(neighbors, emptySet(), FULL_RANGE), false, KEYSPACE, SEQUENTIAL, false, false, - NONE, false, true, false, false, CF); + NONE, false, true, false, CF); - this.job = new CassandraRepairJob(session, CF); + this.job = new RepairJob(session, CF); this.sessionJobDesc = new RepairJobDesc(session.state.parentRepairSession, session.getId(), session.state.keyspace, CF, session.ranges()); @@ -267,7 +267,7 @@ public class RepairJobTest // Use addr4 instead of one of the provided trees to force everything to be remote sync tasks as // LocalSyncTasks try to reach over the network. - List<SyncTask> syncTasks = CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, sessionJobDesc, mockTreeResponses, + List<SyncTask> syncTasks = RepairJob.createStandardSyncTasks(SharedContext.Global.instance, sessionJobDesc, mockTreeResponses, addr4, // local noTransient(), session.isIncremental, @@ -367,7 +367,7 @@ public class RepairJobTest treeResponse(addr2, RANGE_1, "different", RANGE_2, "same", RANGE_3, "different"), treeResponse(addr3, RANGE_1, "same", RANGE_2, "same", RANGE_3, "same")); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, addr1, // local noTransient(), // transient @@ -403,7 +403,7 @@ public class RepairJobTest List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"), treeResponse(addr2, RANGE_1, "different", RANGE_2, "same", RANGE_3, "different")); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, addr1, // local transientPredicate(addr2), @@ -433,7 +433,7 @@ public class RepairJobTest List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"), treeResponse(addr2, RANGE_1, "different", RANGE_2, "same", RANGE_3, "different")); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, addr1, // local transientPredicate(addr1), @@ -493,7 +493,7 @@ public class RepairJobTest List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"), treeResponse(addr2, RANGE_1, "same", RANGE_2, "same", RANGE_3, "same")); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, local, // local isTransient, @@ -511,13 +511,13 @@ public class RepairJobTest treeResponse(addr2, RANGE_1, "two", RANGE_2, "two", RANGE_3, "two"), treeResponse(addr3, RANGE_1, "three", RANGE_2, "three", RANGE_3, "three")); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, addr1, // local ep -> ep.equals(addr3), // transient - false, - true, - PreviewKind.ALL)); + false, + true, + PreviewKind.ALL)); assertThat(tasks).hasSize(3); @@ -542,7 +542,7 @@ public class RepairJobTest treeResponse(addr5, RANGE_1, "five", RANGE_2, "five", RANGE_3, "five")); Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, addr1, // local isTransient, // transient @@ -609,7 +609,7 @@ public class RepairJobTest treeResponse(addr5, RANGE_1, "five", RANGE_2, "five", RANGE_3, "five")); Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, local, // local isTransient, // transient @@ -658,13 +658,13 @@ public class RepairJobTest treeResponse(addr4, RANGE_1, "four", RANGE_2, "four", RANGE_3, "four"), treeResponse(addr5, RANGE_1, "five", RANGE_2, "five", RANGE_3, "five")); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, addr4, // local ep -> ep.equals(addr4) || ep.equals(addr5), // transient - false, - pullRepair, - PreviewKind.ALL)); + false, + pullRepair, + PreviewKind.ALL)); assertThat(tasks.get(pair(addr4, addr5))).isNull(); } @@ -676,13 +676,13 @@ public class RepairJobTest treeResponse(addr2, RANGE_1, "two", RANGE_2, "two", RANGE_3, "two"), treeResponse(addr3, RANGE_1, "three", RANGE_2, "three", RANGE_3, "three")); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, addr1, // local noTransient(), addr -> "DC1", - false, - PreviewKind.ALL)); + false, + PreviewKind.ALL)); for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2), pair(addr1, addr3), @@ -711,13 +711,13 @@ public class RepairJobTest treeResponse(addr2, RANGE_1, "one", RANGE_2, "two"), treeResponse(addr3, RANGE_1, "three", RANGE_2, "two")); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, JOB_DESC, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, JOB_DESC, treeResponses, addr4, // local noTransient(), addr -> "DC1", - false, - PreviewKind.ALL)); + false, + PreviewKind.ALL)); SyncTaskListAssert.assertThat(tasks.values()).areAllInstanceOf(AsymmetricRemoteSyncTask.class); @@ -744,13 +744,13 @@ public class RepairJobTest treeResponse(addr3, RANGE_1, "same", RANGE_2, "same", RANGE_3, "same")); RepairJobDesc desc = new RepairJobDesc(nextTimeUUID(), nextTimeUUID(), "ks", "cf", Collections.emptyList()); - Map<SyncNodePair, SyncTask> tasks = toMap(CassandraRepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, desc, + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, desc, treeResponses, addr1, // local ep -> ep.equals(addr3), addr -> "DC1", - false, - PreviewKind.ALL)); + false, + PreviewKind.ALL)); SyncTask task = tasks.get(pair(addr1, addr2)); diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index 5ef72a9e58..c59be70c7b 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -67,7 +67,7 @@ public class RepairSessionTest new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)), false, "Keyspace1", RepairParallelism.SEQUENTIAL, false, false, PreviewKind.NONE, false, - false, false, false, "Standard1"); + false, false, false, false, "Standard1"); // perform convict session.convict(remote, Double.MAX_VALUE); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java b/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java index 1630b3c7c3..143f63a26c 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java @@ -38,7 +38,6 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import static org.apache.cassandra.service.accord.AccordTestUtils.*; -import static org.apache.cassandra.service.accord.AccordTopologyTest.token; public class AccordFastPathCoordinatorTest { diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index 6d37f962ac..a608f41f8d 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -81,6 +81,9 @@ import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.TransactionStatement; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.metrics.AccordStateCacheMetrics; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; @@ -478,4 +481,20 @@ public class AccordTestUtils { return Arrays.stream(ids).mapToObj(AccordTestUtils::id).collect(Collectors.toSet()); } + + public static Token token(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + public static Range<Token> range(Token left, Token right) + { + return new Range<>(left, right); + } + + public static Range<Token> range(long left, long right) + { + return range(token(left), token(right)); + } + } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java index e3ddf6eeb5..a0ada37645 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java @@ -18,15 +18,11 @@ package org.apache.cassandra.service.accord; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -40,40 +36,20 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Keyspaces; -import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.Location; -import org.apache.cassandra.tcm.membership.NodeAddresses; -import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.tcm.membership.NodeState; -import org.apache.cassandra.tcm.membership.NodeVersion; -import org.apache.cassandra.tcm.ownership.DataPlacement; -import org.apache.cassandra.tcm.ownership.DataPlacements; import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; +import static org.apache.cassandra.service.accord.AccordTopologyUtils.*; public class AccordTopologyTest { - private static final Id ID1 = new Id(1); - private static final Id ID2 = new Id(2); - private static final Id ID3 = new Id(3); - private static final List<Id> NODE_LIST = ImmutableList.of(ID1, ID2, ID3); - private static final Set<Id> NODE_SET = ImmutableSet.copyOf(NODE_LIST); - - private static final InetAddressAndPort EP1 = ep(1); - private static final InetAddressAndPort EP2 = ep(2); - private static final InetAddressAndPort EP3 = ep(3); - private static final IPartitioner partitioner = Murmur3Partitioner.instance; private static TableId tableId = null; private static KeyspaceMetadata keyspace = null; @@ -89,75 +65,6 @@ public class AccordTopologyTest keyspace = KeyspaceMetadata.create("ks", KeyspaceParams.simple(3), Tables.of(table)); } - private static InetAddressAndPort ep(int i) - { - try - { - return InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(new byte[]{127, 0, 0, (byte)i}), 7012); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - - private static NodeId nodeId(int id) - { - return new NodeId(id); - } - - static void addNode(ClusterMetadata.Transformer transformer, int node, Token token) - { - NodeId nodeId = nodeId(node); - InetAddressAndPort ep = ep(node); - NodeAddresses addresses = new NodeAddresses(nodeId.toUUID(), ep, ep, ep); - transformer.register(nodeId, addresses, LOCATION, NodeVersion.CURRENT); - transformer.withNodeState(nodeId, NodeState.JOINED); - transformer.proposeToken(nodeId, Collections.singleton(token)); - transformer.addToRackAndDC(nodeId); - } - - private static ClusterMetadata configureCluster(List<Range<Token>> ranges, Keyspaces keyspaces) - { - assert ranges.size() == 3; - - IPartitioner partitioner = Murmur3Partitioner.instance; - ClusterMetadata empty = new ClusterMetadata(partitioner); - ClusterMetadata.Transformer transformer = empty.transformer(); - transformer.with(new DistributedSchema(Keyspaces.of(keyspace))); - addNode(transformer, 1, ranges.get(0).right); - addNode(transformer, 2, ranges.get(1).right); - addNode(transformer, 3, ranges.get(2).right); - ClusterMetadata metadata = transformer.build().metadata; - - for (KeyspaceMetadata keyspace : keyspaces) - { - ReplicationParams replication = keyspace.params.replication; - AbstractReplicationStrategy strategy = AbstractReplicationStrategy.createReplicationStrategy(keyspace.name, replication); - DataPlacements.Builder placements = metadata.placements.unbuild(); - DataPlacement placement = strategy.calculateDataPlacement(Epoch.EMPTY, metadata.tokenMap.toRanges(), metadata); - placements.with(replication, placement); - metadata = transformer.with(placements.build()).build().metadata; - } - - return metadata; - } - - static Token token(long t) - { - return new Murmur3Partitioner.LongToken(t); - } - - static Range<Token> range(Token left, Token right) - { - return new Range<>(left, right); - } - - static Range<Token> range(long left, long right) - { - return range(token(left), token(right)); - } - /** * Check converter does the right thing if the ring is constructed with min and max tokens */ diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java new file mode 100644 index 0000000000..c3c8e6b588 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import accord.local.Node; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.DataPlacements; + +public class AccordTopologyUtils +{ + public static final Node.Id ID1 = new Node.Id(1); + public static final Node.Id ID2 = new Node.Id(2); + public static final Node.Id ID3 = new Node.Id(3); + public static final List<Node.Id> NODE_LIST = ImmutableList.of(ID1, ID2, ID3); + public static final Set<Node.Id> NODE_SET = ImmutableSet.copyOf(NODE_LIST); + + private static final IPartitioner partitioner = Murmur3Partitioner.instance; + private static final Location LOCATION = new Location("DC1", "RACK1"); + + public static InetAddressAndPort ep(int i) + { + try + { + return InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(new byte[]{ 127, 0, 0, (byte)i}), 7012); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + private static NodeId nodeId(int id) + { + return new NodeId(id); + } + + static void addNode(ClusterMetadata.Transformer transformer, int node, Token token) + { + NodeId nodeId = nodeId(node); + InetAddressAndPort ep = ep(node); + NodeAddresses addresses = new NodeAddresses(nodeId.toUUID(), ep, ep, ep); + transformer.register(nodeId, addresses, LOCATION, NodeVersion.CURRENT); + transformer.withNodeState(nodeId, NodeState.JOINED); + transformer.proposeToken(nodeId, Collections.singleton(token)); + transformer.addToRackAndDC(nodeId); + } + + public static ClusterMetadata configureCluster(List<Range<Token>> ranges, Keyspaces keyspaces) + { + assert ranges.size() == 3; + + IPartitioner partitioner = Murmur3Partitioner.instance; + ClusterMetadata empty = new ClusterMetadata(partitioner); + ClusterMetadata.Transformer transformer = empty.transformer(); + transformer.with(new DistributedSchema(keyspaces)); + addNode(transformer, 1, ranges.get(0).right); + addNode(transformer, 2, ranges.get(1).right); + addNode(transformer, 3, ranges.get(2).right); + ClusterMetadata metadata = transformer.build().metadata; + + for (KeyspaceMetadata keyspace : keyspaces) + { + ReplicationParams replication = keyspace.params.replication; + AbstractReplicationStrategy strategy = AbstractReplicationStrategy.createReplicationStrategy(keyspace.name, replication); + DataPlacements.Builder placements = metadata.placements.unbuild(); + DataPlacement placement = strategy.calculateDataPlacement(Epoch.EMPTY, metadata.tokenMap.toRanges(), metadata); + placements.with(replication, placement); + metadata = transformer.with(placements.build()).build().metadata; + } + + return metadata; + } + + static Token token(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + static Range<Token> range(Token left, Token right) + { + return new Range<>(left, right); + } + + public static Range<Token> range(long left, long right) + { + return range(token(left), token(right)); + } + +} diff --git a/test/unit/org/apache/cassandra/service/accord/repair/RequiredResponseTrackerTest.java b/test/unit/org/apache/cassandra/service/accord/repair/RequiredResponseTrackerTest.java new file mode 100644 index 0000000000..022543bb3a --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/repair/RequiredResponseTrackerTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord.repair; + +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import accord.api.TopologySorter; +import accord.coordinate.tracking.RequestStatus; +import accord.local.Node; +import accord.topology.Topologies; +import accord.topology.Topology; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.service.accord.AccordTopology; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Location; + +import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; +import static org.apache.cassandra.service.accord.AccordTopologyUtils.*; + +public class RequiredResponseTrackerTest +{ + private static final IPartitioner partitioner = Murmur3Partitioner.instance; + private static TableId tableId = null; + private static KeyspaceMetadata keyspace = null; + private static Topology topology; + private static final Location LOCATION = new Location("DC1", "RACK1"); + + private static final List<Range<Token>> RANGES = ImmutableList.of(range(-100, 0), range(0, 100), range(100, -100)); + private static final TopologySorter TOPOLOGY_SORTER = (node1, node2, shards) -> node1.compareTo(node2); + + @BeforeClass + public static void beforeClass() throws Throwable + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + TableMetadata table = parse("CREATE TABLE tbl (k int, c int, v int, primary key (k, c)) WITH transactional_mode='full'", "ks").build(); + tableId = table.id; + keyspace = KeyspaceMetadata.create("ks", KeyspaceParams.simple(3), Tables.of(table)); + + ClusterMetadata metadata = configureCluster(RANGES, Keyspaces.of(keyspace)); + topology = AccordTopology.createAccordTopology(metadata); + + } + + @Test + public void successCase() + { + Set<Node.Id> nodes = topology.nodes(); + Assert.assertEquals(NODE_SET, nodes); + RequiredResponseTracker tracker = new RequiredResponseTracker(nodes, new Topologies.Single(TOPOLOGY_SORTER, topology)); + Assert.assertEquals(RequestStatus.NoChange, tracker.recordSuccess(ID1)); + Assert.assertEquals(RequestStatus.NoChange, tracker.recordSuccess(ID2)); + Assert.assertEquals(RequestStatus.Success, tracker.recordSuccess(ID3)); + } + + @Test + public void failureCase() + { + Set<Node.Id> nodes = topology.nodes(); + Assert.assertEquals(NODE_SET, nodes); + RequiredResponseTracker tracker = new RequiredResponseTracker(nodes, new Topologies.Single(TOPOLOGY_SORTER, topology)); + Assert.assertEquals(RequestStatus.NoChange, tracker.recordSuccess(ID1)); + Assert.assertEquals(RequestStatus.Failed, tracker.recordFailure(ID2)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org