Abstract repair for pluggable storage Patch by Blake Eggleston; Reviewed by Jason Brown for CASSANDRA-14116
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c5a7fcaa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c5a7fcaa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c5a7fcaa Branch: refs/heads/trunk Commit: c5a7fcaa8e00083d6f74967c40474aef07b0d21a Parents: 478c1a9 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Sat Mar 10 09:16:46 2018 -0800 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Fri Apr 6 16:11:14 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 10 + src/java/org/apache/cassandra/db/Keyspace.java | 10 + .../db/compaction/CompactionManager.java | 257 +------------ .../repair/CassandraKeyspaceRepairManager.java | 48 +++ .../db/repair/CassandraTableRepairManager.java | 83 +++++ .../db/repair/CassandraValidationIterator.java | 321 ++++++++++++++++ .../db/repair/PendingAntiCompaction.java | 205 +++++++++++ .../cassandra/repair/KeyspaceRepairManager.java | 42 +++ .../repair/RepairMessageVerbHandler.java | 19 +- .../cassandra/repair/TableRepairManager.java | 64 ++++ .../cassandra/repair/ValidationManager.java | 163 +++++++++ .../repair/ValidationPartitionIterator.java | 33 ++ .../repair/consistent/ConsistentSession.java | 8 +- .../repair/consistent/LocalSessions.java | 42 +-- .../consistent/PendingAntiCompaction.java | 202 ---------- .../cassandra/service/ActiveRepairService.java | 48 +-- ...tionManagerGetSSTablesForValidationTest.java | 179 --------- .../LeveledCompactionStrategyTest.java | 4 +- ...tionManagerGetSSTablesForValidationTest.java | 182 +++++++++ .../db/repair/PendingAntiCompactionTest.java | 366 +++++++++++++++++++ .../apache/cassandra/repair/ValidatorTest.java | 2 +- .../repair/consistent/LocalSessionTest.java | 46 ++- .../consistent/PendingAntiCompactionTest.java | 365 ------------------ .../service/ActiveRepairServiceTest.java | 12 +- 25 files changed, 1603 insertions(+), 1109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b3617c3..3db7f27 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Abstract repair for pluggable storage (CASSANDRA-14116) * Add meaningful toString() impls (CASSANDRA-13653) * Add sstableloader option to accept target keyspace name (CASSANDRA-13884) * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 4f74667..e4b84fe 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -51,6 +51,7 @@ import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.streaming.CassandraStreamManager; +import org.apache.cassandra.db.repair.CassandraTableRepairManager; import org.apache.cassandra.db.view.TableViews; import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.db.partitions.CachedPartition; @@ -75,6 +76,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.metrics.TableMetrics.Sampler; +import org.apache.cassandra.repair.TableRepairManager; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.service.CacheService; @@ -216,6 +218,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private final CassandraStreamManager streamManager; + private final TableRepairManager repairManager; + private volatile boolean compactionSpaceCheck = true; @VisibleForTesting @@ -447,6 +451,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean oldMBeanName= null; } streamManager = new CassandraStreamManager(this); + repairManager = new CassandraTableRepairManager(this); } public void updateSpeculationThreshold() @@ -466,6 +471,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return streamManager; } + public TableRepairManager getRepairManager() + { + return repairManager; + } + public TableMetadata metadata() { return metadata.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index ae778f1..42d43b2 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.repair.CassandraKeyspaceRepairManager; import org.apache.cassandra.db.view.ViewManager; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.index.Index; @@ -45,6 +46,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.metrics.KeyspaceMetrics; +import org.apache.cassandra.repair.KeyspaceRepairManager; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.schema.Schema; @@ -90,6 +92,7 @@ public class Keyspace private volatile AbstractReplicationStrategy replicationStrategy; public final ViewManager viewManager; private volatile ReplicationParams replicationParams; + private final KeyspaceRepairManager repairManager; public static final Function<String,Keyspace> keyspaceTransformer = new Function<String, Keyspace>() { @@ -335,6 +338,7 @@ public class Keyspace initCf(Schema.instance.getTableMetadataRef(cfm.id), loadSSTables); } this.viewManager.reload(false); + this.repairManager = new CassandraKeyspaceRepairManager(this); } private Keyspace(KeyspaceMetadata metadata) @@ -343,6 +347,12 @@ public class Keyspace createReplicationStrategy(metadata); this.metric = new KeyspaceMetrics(this); this.viewManager = new ViewManager(this); + this.repairManager = new CassandraKeyspaceRepairManager(this); + } + + public KeyspaceRepairManager getRepairManager() + { + return repairManager; } public static Keyspace mockKS(KeyspaceMetadata metadata) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 23b881a..5672dfe 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -41,6 +41,7 @@ import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.repair.ValidationPartitionIterator; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.Schema; @@ -132,8 +133,7 @@ public class CompactionManager implements CompactionManagerMBean private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE); - @VisibleForTesting - CompactionMetrics getMetrics() + public CompactionMetrics getMetrics() { return metrics; } @@ -941,30 +941,9 @@ public class CompactionManager implements CompactionManagerMBean return null; } - /** - * Does not mutate data, so is not scheduled. - */ - public Future<?> submitValidation(final ColumnFamilyStore cfStore, final Validator validator) + public Future<?> submitValidation(Callable<Object> validation) { - Callable<Object> callable = new Callable<Object>() - { - public Object call() throws IOException - { - try (TableMetrics.TableTimer.Context c = cfStore.metric.validationTime.time()) - { - doValidationCompaction(cfStore, validator); - } - catch (Throwable e) - { - // we need to inform the remote end of our failure, otherwise it will hang on repair forever - validator.fail(); - throw e; - } - return this; - } - }; - - return validationExecutor.submitIfRunning(callable, "validation"); + return validationExecutor.submitIfRunning(validation, "validation"); } /* Used in tests. */ @@ -1315,192 +1294,6 @@ public class CompactionManager implements CompactionManagerMBean txn); } - - /** - * Performs a readonly "compaction" of all sstables in order to validate complete rows, - * but without writing the merge result - */ - @SuppressWarnings("resource") - private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator) throws IOException - { - // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped - // mid-validation, or to attempt to validate a droped CFS. this is just a best effort to avoid useless work, - // particularly in the scenario where a validation is submitted before the drop, and there are compactions - // started prior to the drop keeping some sstables alive. Since validationCompaction can run - // concurrently with other compactions, it would otherwise go ahead and scan those again. - if (!cfs.isValid()) - return; - - Refs<SSTableReader> sstables = null; - try - { - UUID parentRepairSessionId = validator.desc.parentSessionId; - String snapshotName; - boolean isGlobalSnapshotValidation = cfs.snapshotExists(parentRepairSessionId.toString()); - if (isGlobalSnapshotValidation) - snapshotName = parentRepairSessionId.toString(); - else - snapshotName = validator.desc.sessionId.toString(); - boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); - - if (isSnapshotValidation) - { - // If there is a snapshot created for the session then read from there. - // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we - // are supposed to validate. - sstables = cfs.getSnapshotSSTableReaders(snapshotName); - } - else - { - if (!validator.isIncremental) - { - // flush first so everyone is validating data that is as similar as possible - StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); - } - sstables = getSSTablesToValidate(cfs, validator); - if (sstables == null) - return; // this means the parent repair session was removed - the repair session failed on another node and we removed it - } - - // Create Merkle trees suitable to hold estimated partitions for the given ranges. - // We blindly assume that a partition is evenly distributed on all sstables for now. - MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs); - long start = System.nanoTime(); - long partitionCount = 0; - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges); - ValidationCompactionController controller = new ValidationCompactionController(cfs, getDefaultGcBefore(cfs, validator.nowInSec)); - CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, validator.nowInSec, metrics)) - { - // validate the CF as we iterate over it - validator.prepare(cfs, tree); - while (ci.hasNext()) - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - try (UnfilteredRowIterator partition = ci.next()) - { - validator.add(partition); - partitionCount++; - } - } - validator.complete(); - } - finally - { - if (isSnapshotValidation && !isGlobalSnapshotValidation) - { - // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction - // is done). - cfs.clearSnapshot(snapshotName); - } - cfs.metric.partitionsValidated.update(partitionCount); - } - long estimatedTotalBytes = 0; - for (SSTableReader sstable : sstables) - { - for (Pair<Long, Long> positionsForRanges : sstable.getPositionsForRanges(validator.desc.ranges)) - estimatedTotalBytes += positionsForRanges.right - positionsForRanges.left; - } - cfs.metric.bytesValidated.update(estimatedTotalBytes); - if (logger.isDebugEnabled()) - { - long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - logger.debug("Validation of {} partitions (~{}) finished in {} msec, for {}", - partitionCount, - FBUtilities.prettyPrintMemory(estimatedTotalBytes), - duration, - validator.desc); - } - } - finally - { - if (sstables != null) - sstables.release(); - } - } - - private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs) - { - MerkleTrees tree = new MerkleTrees(cfs.getPartitioner()); - long allPartitions = 0; - Map<Range<Token>, Long> rangePartitionCounts = Maps.newHashMapWithExpectedSize(ranges.size()); - for (Range<Token> range : ranges) - { - long numPartitions = 0; - for (SSTableReader sstable : sstables) - numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range)); - rangePartitionCounts.put(range, numPartitions); - allPartitions += numPartitions; - } - - for (Range<Token> range : ranges) - { - long numPartitions = rangePartitionCounts.get(range); - double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0; - // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress, - // capping at 20 to prevent large tree (CASSANDRA-11390) - int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0; - // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263) - int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0; - tree.addMerkleTree((int) Math.pow(2, depth), range); - } - if (logger.isDebugEnabled()) - { - // MT serialize may take time - logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0)); - } - - return tree; - } - - @VisibleForTesting - synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator) - { - Refs<SSTableReader> sstables; - - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); - if (prs == null) - return null; - Set<SSTableReader> sstablesToValidate = new HashSet<>(); - - com.google.common.base.Predicate<SSTableReader> predicate; - if (prs.isPreview()) - { - predicate = prs.getPreviewPredicate(); - - } - else if (validator.isIncremental) - { - predicate = s -> validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair); - } - else - { - // note that we always grab all existing sstables for this - if we were to just grab the ones that - // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream - predicate = (s) -> !prs.isIncremental || !s.isRepaired(); - } - - try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, predicate))) - { - for (SSTableReader sstable : sstableCandidates.sstables) - { - if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges)) - { - sstablesToValidate.add(sstable); - } - } - - sstables = Refs.tryRef(sstablesToValidate); - if (sstables == null) - { - logger.error("Could not reference sstables"); - throw new RuntimeException("Could not reference sstables"); - } - } - - return sstables; - } - /** * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted @@ -1702,45 +1495,6 @@ public class CompactionManager implements CompactionManagerMBean return cfs.isIndex() ? nowInSec : cfs.gcBefore(nowInSec); } - private static class ValidationCompactionIterator extends CompactionIterator - { - public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, CompactionMetrics metrics) - { - super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics); - } - } - - /* - * Controller for validation compaction that always purges. - * Note that we should not call cfs.getOverlappingSSTables on the provided - * sstables because those sstables are not guaranteed to be active sstables - * (since we can run repair on a snapshot). - */ - private static class ValidationCompactionController extends CompactionController - { - public ValidationCompactionController(ColumnFamilyStore cfs, int gcBefore) - { - super(cfs, gcBefore); - } - - @Override - public LongPredicate getPurgeEvaluator(DecoratedKey key) - { - /* - * The main reason we always purge is that including gcable tombstone would mean that the - * repair digest will depends on the scheduling of compaction on the different nodes. This - * is still not perfect because gcbefore is currently dependend on the current time at which - * the validation compaction start, which while not too bad for normal repair is broken for - * repair on snapshots. A better solution would be to agree on a gcbefore that all node would - * use, and we'll do that with CASSANDRA-4932. - * Note validation compaction includes all sstables, so we don't have the problem of purging - * a tombstone that could shadow a column in another sstable, but this is doubly not a concern - * since validation compaction is read-only. - */ - return time -> true; - } - } - public ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task) { return viewBuildExecutor.submitIfRunning(() -> { @@ -1856,7 +1610,8 @@ public class CompactionManager implements CompactionManagerMBean } } - private static class ValidationExecutor extends CompactionExecutor + // TODO: pull out relevant parts of CompactionExecutor and move to ValidationManager + public static class ValidationExecutor extends CompactionExecutor { public ValidationExecutor() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java new file mode 100644 index 0000000..5f2e5a0 --- /dev/null +++ b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.repair; + +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.KeyspaceRepairManager; + +public class CassandraKeyspaceRepairManager implements KeyspaceRepairManager +{ + private final Keyspace keyspace; + + public CassandraKeyspaceRepairManager(Keyspace keyspace) + { + this.keyspace = keyspace; + } + + @Override + public ListenableFuture prepareIncrementalRepair(UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor) + { + PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, tables, ranges, executor); + return pac.run(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java new file mode 100644 index 0000000..983e30f --- /dev/null +++ b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.repair; + +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import com.google.common.base.Predicate; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.repair.TableRepairManager; +import org.apache.cassandra.repair.ValidationPartitionIterator; +import org.apache.cassandra.repair.Validator; + +public class CassandraTableRepairManager implements TableRepairManager +{ + private final ColumnFamilyStore cfs; + + public CassandraTableRepairManager(ColumnFamilyStore cfs) + { + this.cfs = cfs; + } + + @Override + public ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> ranges, UUID parentId, UUID sessionID, boolean isIncremental, int nowInSec) throws IOException + { + return new CassandraValidationIterator(cfs, ranges, parentId, sessionID, isIncremental, nowInSec); + } + + @Override + public Future<?> submitValidation(Callable<Object> validation) + { + return CompactionManager.instance.submitValidation(validation); + } + + @Override + public void incrementalSessionCompleted(UUID sessionID) + { + CompactionManager.instance.submitBackground(cfs); + } + + @Override + public synchronized void snapshot(String name, Collection<Range<Token>> ranges, boolean force) + { + if (force || !cfs.snapshotExists(name)) + { + cfs.snapshot(name, new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader sstable) + { + return sstable != null && + !sstable.metadata().isIndex() && // exclude SSTables from 2i + new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges); + } + }, true, false); //ephemeral snapshot, if repair fails, it will be cleaned next startup + } + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java new file mode 100644 index 0000000..992a384 --- /dev/null +++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.repair; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.LongPredicate; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.CompactionIterator; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.metrics.CompactionMetrics; +import org.apache.cassandra.repair.ValidationPartitionIterator; +import org.apache.cassandra.repair.Validator; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Refs; + +public class CassandraValidationIterator extends ValidationPartitionIterator +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraValidationIterator.class); + + /* + * Controller for validation compaction that always purges. + * Note that we should not call cfs.getOverlappingSSTables on the provided + * sstables because those sstables are not guaranteed to be active sstables + * (since we can run repair on a snapshot). + */ + private static class ValidationCompactionController extends CompactionController + { + public ValidationCompactionController(ColumnFamilyStore cfs, int gcBefore) + { + super(cfs, gcBefore); + } + + @Override + public LongPredicate getPurgeEvaluator(DecoratedKey key) + { + /* + * The main reason we always purge is that including gcable tombstone would mean that the + * repair digest will depends on the scheduling of compaction on the different nodes. This + * is still not perfect because gcbefore is currently dependend on the current time at which + * the validation compaction start, which while not too bad for normal repair is broken for + * repair on snapshots. A better solution would be to agree on a gcbefore that all node would + * use, and we'll do that with CASSANDRA-4932. + * Note validation compaction includes all sstables, so we don't have the problem of purging + * a tombstone that could shadow a column in another sstable, but this is doubly not a concern + * since validation compaction is read-only. + */ + return time -> true; + } + } + + public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec) + { + // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to + // add any GcGrace however since 2ndary indexes are local to a node. + return cfs.isIndex() ? nowInSec : cfs.gcBefore(nowInSec); + } + + private static class ValidationCompactionIterator extends CompactionIterator + { + public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, CompactionMetrics metrics) + { + super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics); + } + } + + private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind previewKind) + { + switch (previewKind) + { + case ALL: + return (s) -> true; + case REPAIRED: + return (s) -> s.isRepaired(); + case UNREPAIRED: + return (s) -> !s.isRepaired(); + default: + throw new RuntimeException("Can't get preview predicate for preview kind " + previewKind); + } + } + + @VisibleForTesting + static synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID parentId, boolean isIncremental) + { + Refs<SSTableReader> sstables; + + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentId); + if (prs == null) + { + // this means the parent repair session was removed - the repair session failed on another node and we removed it + return new Refs<>(); + } + + Set<SSTableReader> sstablesToValidate = new HashSet<>(); + + com.google.common.base.Predicate<SSTableReader> predicate; + if (prs.isPreview()) + { + predicate = getPreviewPredicate(prs.previewKind); + + } + else if (isIncremental) + { + predicate = s -> parentId.equals(s.getSSTableMetadata().pendingRepair); + } + else + { + // note that we always grab all existing sstables for this - if we were to just grab the ones that + // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream + predicate = (s) -> !prs.isIncremental || !s.isRepaired(); + } + + try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, predicate))) + { + for (SSTableReader sstable : sstableCandidates.sstables) + { + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)) + { + sstablesToValidate.add(sstable); + } + } + + sstables = Refs.tryRef(sstablesToValidate); + if (sstables == null) + { + logger.error("Could not reference sstables"); + throw new RuntimeException("Could not reference sstables"); + } + } + + return sstables; + } + + private final ColumnFamilyStore cfs; + private final Refs<SSTableReader> sstables; + private final String snapshotName; + private final boolean isGlobalSnapshotValidation; + + private final boolean isSnapshotValidation; + private final AbstractCompactionStrategy.ScannerList scanners; + private final ValidationCompactionController controller; + + private final CompactionIterator ci; + + private final long estimatedBytes; + private final long estimatedPartitions; + private final Map<Range<Token>, Long> rangePartitionCounts; + + public CassandraValidationIterator(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID parentId, UUID sessionID, boolean isIncremental, int nowInSec) throws IOException + { + this.cfs = cfs; + + isGlobalSnapshotValidation = cfs.snapshotExists(parentId.toString()); + if (isGlobalSnapshotValidation) + snapshotName = parentId.toString(); + else + snapshotName = sessionID.toString(); + isSnapshotValidation = cfs.snapshotExists(snapshotName); + + if (isSnapshotValidation) + { + // If there is a snapshot created for the session then read from there. + // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we + // are supposed to validate. + sstables = cfs.getSnapshotSSTableReaders(snapshotName); + } + else + { + if (!isIncremental) + { + // flush first so everyone is validating data that is as similar as possible + StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); + } + sstables = getSSTablesToValidate(cfs, ranges, parentId, isIncremental); + } + + Preconditions.checkArgument(sstables != null); + controller = new ValidationCompactionController(cfs, getDefaultGcBefore(cfs, nowInSec)); + scanners = cfs.getCompactionStrategyManager().getScanners(sstables, ranges); + ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, CompactionManager.instance.getMetrics()); + + long allPartitions = 0; + rangePartitionCounts = Maps.newHashMapWithExpectedSize(ranges.size()); + for (Range<Token> range : ranges) + { + long numPartitions = 0; + for (SSTableReader sstable : sstables) + numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range)); + rangePartitionCounts.put(range, numPartitions); + allPartitions += numPartitions; + } + estimatedPartitions = allPartitions; + + long estimatedTotalBytes = 0; + for (SSTableReader sstable : sstables) + { + for (Pair<Long, Long> positionsForRanges : sstable.getPositionsForRanges(ranges)) + estimatedTotalBytes += positionsForRanges.right - positionsForRanges.left; + } + estimatedBytes = estimatedTotalBytes; + } + + @Override + public void close() + { + // TODO: can any of this fail and leave stuff unreleased? + super.close(); + + if (ci != null) + ci.close(); + + if (scanners != null) + scanners.close(); + + if (controller != null) + controller.close(); + + if (isSnapshotValidation && !isGlobalSnapshotValidation) + { + // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction + // is done). + cfs.clearSnapshot(snapshotName); + } + + if (sstables != null) + sstables.release(); + } + + @Override + public TableMetadata metadata() + { + return cfs.metadata.get(); + } + + private void throwIfStopRequested() + { + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + } + + @Override + public boolean hasNext() + { + throwIfStopRequested(); + return ci.hasNext(); + } + + @Override + public UnfilteredRowIterator next() + { + throwIfStopRequested(); + return ci.next(); + } + + @Override + public long getEstimatedBytes() + { + return estimatedBytes; + } + + @Override + public long estimatedPartitions() + { + return estimatedPartitions; + } + + @Override + public Map<Range<Token>, Long> getRangePartitionCounts() + { + return rangePartitionCounts; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java new file mode 100644 index 0000000..4e0f13d --- /dev/null +++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.MoreExecutors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.concurrent.Refs; + +/** + * Performs an anti compaction on a set of tables and token ranges, isolating the unrepaired sstables + * for a give token range into a pending repair group so they can't be compacted with other sstables + * while they are being repaired. + */ +public class PendingAntiCompaction +{ + private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompaction.class); + + static class AcquireResult + { + final ColumnFamilyStore cfs; + final Refs<SSTableReader> refs; + final LifecycleTransaction txn; + + AcquireResult(ColumnFamilyStore cfs, Refs<SSTableReader> refs, LifecycleTransaction txn) + { + this.cfs = cfs; + this.refs = refs; + this.txn = txn; + } + + void abort() + { + if (txn != null) + txn.abort(); + if (refs != null) + refs.release(); + } + } + + static class SSTableAcquisitionException extends RuntimeException {} + + static class AcquisitionCallable implements Callable<AcquireResult> + { + private final ColumnFamilyStore cfs; + private final Collection<Range<Token>> ranges; + private final UUID sessionID; + + public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID) + { + this.cfs = cfs; + this.ranges = ranges; + this.sessionID = sessionID; + } + + private Iterable<SSTableReader> getSSTables() + { + return Iterables.filter(cfs.getLiveSSTables(), s -> !s.isRepaired() && !s.isPendingRepair() && s.intersects(ranges)); + } + + @SuppressWarnings("resource") + private AcquireResult acquireTuple() + { + List<SSTableReader> sstables = Lists.newArrayList(getSSTables()); + if (sstables.isEmpty()) + return new AcquireResult(cfs, null, null); + + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + if (txn != null) + return new AcquireResult(cfs, Refs.ref(sstables), txn); + else + return null; + } + + public AcquireResult call() throws Exception + { + logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID); + AcquireResult refTxn = acquireTuple(); + if (refTxn != null) + return refTxn; + + // try to modify after cancelling running compactions. This will attempt to cancel in flight compactions for + // up to a minute, after which point, null will be returned + return cfs.runWithCompactionsDisabled(this::acquireTuple, false, false); + } + } + + static class AcquisitionCallback implements AsyncFunction<List<AcquireResult>, Object> + { + private final UUID parentRepairSession; + private final Collection<Range<Token>> ranges; + + public AcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges) + { + this.parentRepairSession = parentRepairSession; + this.ranges = ranges; + } + + ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result) + { + return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, result.refs, result.txn, parentRepairSession); + } + + public ListenableFuture apply(List<AcquireResult> results) throws Exception + { + if (Iterables.any(results, t -> t == null)) + { + // Release all sstables, and report failure back to coordinator + for (AcquireResult result : results) + { + if (result != null) + { + logger.info("Releasing acquired sstables for {}.{}", result.cfs.metadata.keyspace, result.cfs.metadata.name); + result.abort(); + } + } + logger.warn("Prepare phase for incremental repair session {} was unable to " + + "acquire exclusive access to the neccesary sstables. " + + "This is usually caused by running multiple incremental repairs on nodes that share token ranges", + parentRepairSession); + return Futures.immediateFailedFuture(new SSTableAcquisitionException()); + } + else + { + List<ListenableFuture<?>> pendingAntiCompactions = new ArrayList<>(results.size()); + for (AcquireResult result : results) + { + if (result.txn != null) + { + ListenableFuture<?> future = submitPendingAntiCompaction(result); + pendingAntiCompactions.add(future); + } + } + + return Futures.allAsList(pendingAntiCompactions); + } + } + } + + private final UUID prsId; + private final Collection<ColumnFamilyStore> tables; + private final Collection<Range<Token>> ranges; + private final ExecutorService executor; + + public PendingAntiCompaction(UUID prsId, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor) + { + this.prsId = prsId; + this.tables = tables; + this.ranges = ranges; + this.executor = executor; + } + + public ListenableFuture run() + { + List<ListenableFutureTask<AcquireResult>> tasks = new ArrayList<>(tables.size()); + for (ColumnFamilyStore cfs : tables) + { + cfs.forceBlockingFlush(); + ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId)); + executor.submit(task); + tasks.add(task); + } + ListenableFuture<List<AcquireResult>> acquisitionResults = Futures.successfulAsList(tasks); + ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, ranges), MoreExecutors.directExecutor()); + return compactionResult; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java new file mode 100644 index 0000000..8aa4381 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +/** + * Keyspace level hook for repair. + */ +public interface KeyspaceRepairManager +{ + /** + * Isolate the unrepaired ranges of the given tables, and make referenceable by session id. Until each table has + * been notified that the repair session has been completed, the data associated with the given session id must + * not be combined with repaired or unrepaired data, or data from other repair sessions. + */ + ListenableFuture prepareIncrementalRepair(UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 4c0a564..1e92a81 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -19,14 +19,10 @@ package org.apache.cassandra.repair; import java.util.*; -import com.google.common.base.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; @@ -103,21 +99,14 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> } ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); + TableRepairManager repairManager = cfs.getRepairManager(); if (prs.isGlobal) { - prs.maybeSnapshot(cfs.metadata.id, desc.parentSessionId); + repairManager.snapshot(desc.parentSessionId.toString(), prs.getRanges(), false); } else { - cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader sstable) - { - return sstable != null && - !sstable.metadata().isIndex() && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(desc.ranges); - } - }, true, false); //ephemeral snapshot, if repair fails, it will be cleaned next startup + repairManager.snapshot(desc.parentSessionId.toString(), desc.ranges, true); } logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); @@ -138,7 +127,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId); Validator validator = new Validator(desc, message.from, validationRequest.nowInSec, isIncremental(desc.parentSessionId), previewKind(desc.parentSessionId)); - CompactionManager.instance.submitValidation(store, validator); + ValidationManager.instance.submitValidation(store, validator); break; case SYNC_REQUEST: http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/TableRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/TableRepairManager.java b/src/java/org/apache/cassandra/repair/TableRepairManager.java new file mode 100644 index 0000000..b72af1d --- /dev/null +++ b/src/java/org/apache/cassandra/repair/TableRepairManager.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +/** + * Table level hook for repair + */ +public interface TableRepairManager +{ + /** + * Return a validation iterator for the given parameters. If isIncremental is true, the iterator must only include + * data previously isolated for repair with the given parentId. nowInSec should determine whether tombstones shouldn + * be purged or not. + */ + ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> ranges, UUID parentId, UUID sessionID, boolean isIncremental, int nowInSec) throws IOException; + + /** + * Begin execution of the given validation callable. Which thread pool a validation should run in is an implementation detail. + */ + Future<?> submitValidation(Callable<Object> validation); + + /** + * Called when the given incremental session has completed. Because of race and failure conditions, implementors + * should not rely only on receiving calls from this method to determine when a session has ended. Implementors + * can determine if a session has finished by calling ActiveRepairService.instance.consistent.local.isSessionInProgress. + * + * Just because a session has completed doesn't mean it's completed succesfully. So implementors need to consult the + * repair service at ActiveRepairService.instance.consistent.local.getFinalSessionRepairedAt to get the repairedAt + * time. If the repairedAt time is zero, the data for the given session should be demoted back to unrepaired. Otherwise, + * it should be promoted to repaired with the given repaired time. + */ + void incrementalSessionCompleted(UUID sessionID); + + /** + * For snapshot repairs. A snapshot of the current data for the given ranges should be taken with the given name. + * If force is true, a snapshot should be taken even if one already exists with that name. + */ + void snapshot(String name, Collection<Range<Token>> ranges, boolean force); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/ValidationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/ValidationManager.java b/src/java/org/apache/cassandra/repair/ValidationManager.java new file mode 100644 index 0000000..d664c8a --- /dev/null +++ b/src/java/org/apache/cassandra/repair/ValidationManager.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; + +public class ValidationManager +{ + private static final Logger logger = LoggerFactory.getLogger(ValidationManager.class); + + public static final ValidationManager instance = new ValidationManager(); + + private ValidationManager() {} + + private static MerkleTrees createMerkleTrees(ValidationPartitionIterator validationIterator, Collection<Range<Token>> ranges, ColumnFamilyStore cfs) + { + MerkleTrees tree = new MerkleTrees(cfs.getPartitioner()); + long allPartitions = validationIterator.estimatedPartitions(); + Map<Range<Token>, Long> rangePartitionCounts = validationIterator.getRangePartitionCounts(); + + for (Range<Token> range : ranges) + { + long numPartitions = rangePartitionCounts.get(range); + double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0; + // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress, + // capping at 20 to prevent large tree (CASSANDRA-11390) + int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0; + // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263) + int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0; + tree.addMerkleTree((int) Math.pow(2, depth), range); + } + if (logger.isDebugEnabled()) + { + // MT serialize may take time + logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0)); + } + + return tree; + } + + private static ValidationPartitionIterator getValidationIterator(TableRepairManager repairManager, Validator validator) throws IOException + { + RepairJobDesc desc = validator.desc; + return repairManager.getValidationIterator(desc.ranges, desc.parentSessionId, desc.sessionId, validator.isIncremental, validator.nowInSec); + } + + /** + * Performs a readonly "compaction" of all sstables in order to validate complete rows, + * but without writing the merge result + */ + @SuppressWarnings("resource") + private void doValidation(ColumnFamilyStore cfs, Validator validator) throws IOException + { + // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped + // mid-validation, or to attempt to validate a droped CFS. this is just a best effort to avoid useless work, + // particularly in the scenario where a validation is submitted before the drop, and there are compactions + // started prior to the drop keeping some sstables alive. Since validationCompaction can run + // concurrently with other compactions, it would otherwise go ahead and scan those again. + if (!cfs.isValid()) + return; + + // Create Merkle trees suitable to hold estimated partitions for the given ranges. + // We blindly assume that a partition is evenly distributed on all sstables for now. + long start = System.nanoTime(); + long partitionCount = 0; + long estimatedTotalBytes = 0; + try (ValidationPartitionIterator vi = getValidationIterator(cfs.getRepairManager(), validator)) + { + MerkleTrees tree = createMerkleTrees(vi, validator.desc.ranges, cfs); + try + { + // validate the CF as we iterate over it + validator.prepare(cfs, tree); + while (vi.hasNext()) + { + try (UnfilteredRowIterator partition = vi.next()) + { + validator.add(partition); + partitionCount++; + } + } + validator.complete(); + } + finally + { + estimatedTotalBytes = vi.getEstimatedBytes(); + partitionCount = vi.estimatedPartitions(); + } + } + finally + { + cfs.metric.bytesValidated.update(estimatedTotalBytes); + cfs.metric.partitionsValidated.update(partitionCount); + } + if (logger.isDebugEnabled()) + { + long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + logger.debug("Validation of {} partitions (~{}) finished in {} msec, for {}", + partitionCount, + FBUtilities.prettyPrintMemory(estimatedTotalBytes), + duration, + validator.desc); + } + } + + /** + * Does not mutate data, so is not scheduled. + */ + public Future<?> submitValidation(ColumnFamilyStore cfs, Validator validator) + { + Callable<Object> validation = new Callable<Object>() + { + public Object call() throws IOException + { + try (TableMetrics.TableTimer.Context c = cfs.metric.validationTime.time()) + { + doValidation(cfs, validator); + } + catch (Throwable e) + { + // we need to inform the remote end of our failure, otherwise it will hang on repair forever + validator.fail(); + throw e; + } + return this; + } + }; + + return cfs.getRepairManager().submitValidation(validation); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/ValidationPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/ValidationPartitionIterator.java b/src/java/org/apache/cassandra/repair/ValidationPartitionIterator.java new file mode 100644 index 0000000..ccfae41 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/ValidationPartitionIterator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.io.IOException; +import java.util.Map; + +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +public abstract class ValidationPartitionIterator extends AbstractUnfilteredPartitionIterator +{ + public abstract long getEstimatedBytes(); + public abstract long estimatedPartitions(); + public abstract Map<Range<Token>, Long> getRangePartitionCounts(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java index 78057e2..03de157 100644 --- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java @@ -64,10 +64,10 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin; * The consistent prepare step promotes the parent repair session to a consistent session, and isolates the sstables * being repaired other sstables. First, the coordinator sends a {@link PrepareConsistentRequest} message to each repair * participant (including itself). When received, the node creates a {@link LocalSession} instance, sets it's state to - * {@code PREPARING}, persists it, and begins a {@link PendingAntiCompaction} task. When the pending anti compaction - * completes, the session state is set to {@code PREPARED}, and a {@link PrepareConsistentResponse} is sent to the - * coordinator indicating success or failure. If the pending anti-compaction fails, the local session state is set - * to {@code FAILED}. + * {@code PREPARING}, persists it, and begins a preparing the tables for incremental repair, which segregates the data + * being repaired from the rest of the table data. When the preparation completes, the session state is set to + * {@code PREPARED}, and a {@link PrepareConsistentResponse} is sent to the coordinator indicating success or failure. + * If the pending anti-compaction fails, the local session state is set to {@code FAILED}. * <p/> * (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort, PrepareConsistentRequest)} * <p/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index e62f6fd..ed25166 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -19,10 +19,10 @@ package org.apache.cassandra.repair.consistent; import java.io.IOException; -import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.time.Instant; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -52,16 +52,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.repair.KeyspaceRepairManager; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.IPartitioner; @@ -81,8 +79,6 @@ import org.apache.cassandra.repair.messages.PrepareConsistentResponse; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.StatusRequest; import org.apache.cassandra.repair.messages.StatusResponse; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; @@ -547,19 +543,17 @@ public class LocalSessions } @VisibleForTesting - ListenableFuture submitPendingAntiCompaction(LocalSession session, ExecutorService executor) + ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor) { - PendingAntiCompaction pac = new PendingAntiCompaction(session.sessionID, session.ranges, executor); - return pac.run(); + return repairManager.prepareIncrementalRepair(sessionID, tables, ranges, executor); } /** - * The PrepareConsistentRequest effectively promotes the parent repair session to a consistent - * incremental session, and begins the 'pending anti compaction' which moves all sstable data - * that is to be repaired into it's own silo, preventing it from mixing with other data. + * The PrepareConsistentRequest promotes the parent repair session to a consistent incremental + * session, and isolates the data to be repaired from the rest of the table's data * - * No response is sent to the repair coordinator until the pending anti compaction has completed - * successfully. If the pending anti compaction fails, a failure message is sent to the coordinator, + * No response is sent to the repair coordinator until the data preparation / isolation has completed + * successfully. If the data preparation fails, a failure message is sent to the coordinator, * cancelling the session. */ public void handlePrepareMessage(InetAddressAndPort from, PrepareConsistentRequest request) @@ -587,8 +581,10 @@ public class LocalSessions ExecutorService executor = Executors.newFixedThreadPool(parentSession.getColumnFamilyStores().size()); - ListenableFuture pendingAntiCompaction = submitPendingAntiCompaction(session, executor); - Futures.addCallback(pendingAntiCompaction, new FutureCallback() + KeyspaceRepairManager repairManager = parentSession.getKeyspace().getRepairManager(); + ListenableFuture repairPreparation = prepareSession(repairManager, sessionID, parentSession.getColumnFamilyStores(), parentSession.getRanges(), executor); + + Futures.addCallback(repairPreparation, new FutureCallback<Object>() { public void onSuccess(@Nullable Object result) { @@ -601,18 +597,6 @@ public class LocalSessions public void onFailure(Throwable t) { logger.error("Prepare phase for incremental repair session {} failed", sessionID, t); - if (t instanceof PendingAntiCompaction.SSTableAcquisitionException) - { - logger.warn("Prepare phase for incremental repair session {} was unable to " + - "acquire exclusive access to the neccesary sstables. " + - "This is usually caused by running multiple incremental repairs on nodes that share token ranges", - sessionID); - - } - else - { - logger.error("Prepare phase for incremental repair session {} failed", sessionID, t); - } sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)); failSession(sessionID, false); executor.shutdown(); @@ -673,7 +657,7 @@ public class LocalSessions ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid); if (cfs != null) { - CompactionManager.instance.submitBackground(cfs); + cfs.getRepairManager().incrementalSessionCompleted(session.sessionID); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java b/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java deleted file mode 100644 index dc3b45d..0000000 --- a/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair.consistent; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import com.google.common.util.concurrent.MoreExecutors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.concurrent.Refs; - -/** - * Performs an anti compaction on a set of tables and token ranges, isolating the unrepaired sstables - * for a give token range into a pending repair group so they can't be compacted with other sstables - * while they are being repaired. - */ -public class PendingAntiCompaction -{ - private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompaction.class); - - static class AcquireResult - { - final ColumnFamilyStore cfs; - final Refs<SSTableReader> refs; - final LifecycleTransaction txn; - - AcquireResult(ColumnFamilyStore cfs, Refs<SSTableReader> refs, LifecycleTransaction txn) - { - this.cfs = cfs; - this.refs = refs; - this.txn = txn; - } - - void abort() - { - if (txn != null) - txn.abort(); - if (refs != null) - refs.release(); - } - } - - static class SSTableAcquisitionException extends RuntimeException {} - - static class AcquisitionCallable implements Callable<AcquireResult> - { - private final ColumnFamilyStore cfs; - private final Collection<Range<Token>> ranges; - private final UUID sessionID; - - public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID) - { - this.cfs = cfs; - this.ranges = ranges; - this.sessionID = sessionID; - } - - private Iterable<SSTableReader> getSSTables() - { - return Iterables.filter(cfs.getLiveSSTables(), s -> !s.isRepaired() && !s.isPendingRepair() && s.intersects(ranges)); - } - - @SuppressWarnings("resource") - private AcquireResult acquireTuple() - { - List<SSTableReader> sstables = Lists.newArrayList(getSSTables()); - if (sstables.isEmpty()) - return new AcquireResult(cfs, null, null); - - LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); - if (txn != null) - return new AcquireResult(cfs, Refs.ref(sstables), txn); - else - return null; - } - - public AcquireResult call() throws Exception - { - logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID); - AcquireResult refTxn = acquireTuple(); - if (refTxn != null) - return refTxn; - - // try to modify after cancelling running compactions. This will attempt to cancel in flight compactions for - // up to a minute, after which point, null will be returned - return cfs.runWithCompactionsDisabled(this::acquireTuple, false, false); - } - } - - static class AcquisitionCallback implements AsyncFunction<List<AcquireResult>, Object> - { - private final UUID parentRepairSession; - private final Collection<Range<Token>> ranges; - - public AcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges) - { - this.parentRepairSession = parentRepairSession; - this.ranges = ranges; - } - - ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result) - { - return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, result.refs, result.txn, parentRepairSession); - } - - public ListenableFuture apply(List<AcquireResult> results) throws Exception - { - if (Iterables.any(results, t -> t == null)) - { - // Release all sstables, and report failure back to coordinator - for (AcquireResult result : results) - { - if (result != null) - { - logger.info("Releasing acquired sstables for {}.{}", result.cfs.metadata.keyspace, result.cfs.metadata.name); - result.abort(); - } - } - return Futures.immediateFailedFuture(new SSTableAcquisitionException()); - } - else - { - List<ListenableFuture<?>> pendingAntiCompactions = new ArrayList<>(results.size()); - for (AcquireResult result : results) - { - if (result.txn != null) - { - ListenableFuture<?> future = submitPendingAntiCompaction(result); - pendingAntiCompactions.add(future); - } - } - - return Futures.allAsList(pendingAntiCompactions); - } - } - } - - private final UUID prsId; - private final Collection<Range<Token>> ranges; - private final ExecutorService executor; - - public PendingAntiCompaction(UUID prsId, Collection<Range<Token>> ranges, ExecutorService executor) - { - this.prsId = prsId; - this.ranges = ranges; - this.executor = executor; - } - - public ListenableFuture run() - { - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); - Collection<ColumnFamilyStore> cfss = prs.getColumnFamilyStores(); - List<ListenableFutureTask<AcquireResult>> tasks = new ArrayList<>(cfss.size()); - for (ColumnFamilyStore cfs : cfss) - { - cfs.forceBlockingFlush(); - ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId)); - executor.submit(task); - tasks.add(task); - } - ListenableFuture<List<AcquireResult>> acquisitionResults = Futures.successfulAsList(tasks); - ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, ranges), MoreExecutors.directExecutor()); - return compactionResult; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org