This is an automated email from the ASF dual-hosted git repository. tolbertam pushed a commit to branch auto_repair_v2_on_4_1_splitter_changes in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 443148eeadceaf8b2881b46fb7ae0b983e0036bf Author: Andy Tolbert <[email protected]> AuthorDate: Wed Sep 25 18:05:30 2024 -0500 Update IAutoRepairTokenRangeSplitter API to be more flexible Updates IAutoRepairTokenRangeSplitter API to take in a set of tables instead of a single table and return "RepairAssignments". This allows the API more control over how the repairs are issued by the AutoRepair framework, by allowing grouping of repairs by specific tables or a single repair at the keyspace level. Also updates IAutoRepairTokenRangeSplitter to be a ParameterizedClass, so custom splitters can have their own configuration. Update AutoRepair to account for these changes. Update DefaultAutoRepairTokenSplitter to account for the change, and to also evaluate on getRepairByKeyspace option. This also improves the filtering of tables that should not be repaired where previously getRepairByKeyspace would not factor this in. Move logic to split token ranges evenly into AutoRepairUtils so it can be reutilized by other splitters. Adds prototype UnrepairedBytesBasedTokenRange that demonstrates consuming splitter-based properties to limit the repair assignments returned to be bound. --- .../cassandra/config/ParameterizedClass.java | 3 +- .../io/sstable/format/big/BigTableScanner.java | 44 ++- .../cassandra/repair/autorepair/AutoRepair.java | 234 ++++++++------- .../repair/autorepair/AutoRepairConfig.java | 15 +- .../repair/autorepair/AutoRepairState.java | 21 ++ .../repair/autorepair/AutoRepairUtils.java | 31 ++ .../autorepair/DefaultAutoRepairTokenSplitter.java | 59 ++-- .../autorepair/IAutoRepairTokenRangeSplitter.java | 76 ++++- .../UnrepairedBytesBasedTokenRangeSplitter.java | 332 +++++++++++++++++++++ .../schema/SystemDistributedKeyspace.java | 2 +- .../org/apache/cassandra/utils/FBUtilities.java | 29 +- .../repair/autorepair/AutoRepairConfigTest.java | 20 +- .../autorepair/AutoRepairParameterizedTest.java | 21 +- 13 files changed, 716 insertions(+), 171 deletions(-) diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java b/src/java/org/apache/cassandra/config/ParameterizedClass.java index 9b001786e3..a101caa92c 100644 --- a/src/java/org/apache/cassandra/config/ParameterizedClass.java +++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.config; +import java.io.Serializable; import java.util.List; import java.util.Map; @@ -27,7 +28,7 @@ import org.apache.cassandra.utils.Shared; import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @Shared(scope = SIMULATION) -public class ParameterizedClass +public class ParameterizedClass implements Serializable { public static final String CLASS_NAME = "class_name"; public static final String PARAMETERS = "parameters"; diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index 235b9b1a71..c1396534a6 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -117,7 +117,7 @@ public class BigTableScanner implements ISSTableScanner this.listener = listener; } - private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges) + public static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges) { List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(tokenRanges.size()); for (Range<Token> range : Range.normalize(tokenRanges)) @@ -209,6 +209,48 @@ public class BigTableScanner implements ISSTableScanner } } + /** + * Gets the position in the data file, but does not seek to it. This does seek the index to find the data position + * but does not actually seek the data file. + * @param position position to find in data file. + * @return + * @throws CorruptSSTableException + */ + public long getDataPosition(PartitionPosition position) throws CorruptSSTableException { + + long indexPosition = sstable.getIndexScanPosition(position); + ifile.seek(indexPosition); + try + { + + while (!ifile.isEOF()) + { + indexPosition = ifile.getFilePointer(); + DecoratedKey indexDecoratedKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + if (indexDecoratedKey.compareTo(position) > 0) + { + // Found, just read the dataPosition and seek into index and data files + long dataPosition = RowIndexEntry.Serializer.readPosition(ifile); + // seek the index file position as we will presumably seek further when goign to the next position. + ifile.seek(indexPosition); + return dataPosition; + } + else + { + RowIndexEntry.Serializer.skip(ifile, sstable.descriptor.version); + } + } + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + // If for whatever reason we don't find position in file, just return 0 + return 0L; + } + + public void close() { try diff --git a/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java b/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java index 69db9e609a..67dedd8256 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java +++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.EnumMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -33,9 +34,11 @@ import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.repair.RepairRunnable; +import org.apache.cassandra.repair.autorepair.IAutoRepairTokenRangeSplitter.RepairAssignment; import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; @@ -203,149 +206,142 @@ public class AutoRepair } repairState.setRepairKeyspaceCount(repairState.getRepairKeyspaceCount() + 1); - List<String> tablesToBeRepaired = retrieveTablesToBeRepaired(keyspace, repairType, repairState); - shuffleFunc.accept(tablesToBeRepaired); - for (String tableName : tablesToBeRepaired) + List<String> tablesToBeRepairedList = retrieveTablesToBeRepaired(keyspace, repairType, repairState); + // TODO: Should we shuffle repair assignments instead of tables? Side effect of that is will + // shuffle the ranges of the table being repaired as well. + shuffleFunc.accept(tablesToBeRepairedList); + Set<String> tablesToBeRepaired = new LinkedHashSet<>(tablesToBeRepairedList); + + String keyspaceName = keyspace.getName(); + List<RepairAssignment> repairAssignments = tokenRangeSplitters.get(repairType).getRepairAssignments(repairType, primaryRangeOnly, keyspaceName, tablesToBeRepaired); + + int totalRepairAssignments = repairAssignments.size(); + long keyspaceStartTime = timeFunc.get(); + for (RepairAssignment repairAssignment : repairAssignments) { - String keyspaceName = keyspace.getName(); try { - List<Pair<Token, Token>> subRangesToBeRepaired = tokenRangeSplitters.get(repairType).getRange(repairType, primaryRangeOnly, keyspaceName, tableName); - int totalSubRanges = subRangesToBeRepaired.size(); - - ColumnFamilyStore columnFamilyStore = keyspace.getColumnFamilyStore(tableName); - if (!columnFamilyStore.metadata().params.automatedRepair.get(repairType).repairEnabled()) + boolean skipAssignment = false; + for (String tableName : repairAssignment.getTableNames()) { - logger.info("Repair is disabled for keyspace {} for tables: {}", keyspaceName, tableName); - repairState.setTotalDisabledTablesRepairCount(repairState.getTotalDisabledTablesRepairCount() + 1); - continue; + ColumnFamilyStore columnFamilyStore = keyspace.getColumnFamilyStore(tableName); + // this is done to make autorepair safe as running repair on table with more sstables + // may have its own challenges + int size = columnFamilyStore.getLiveSSTables().size(); + if (size > config.getRepairSSTableCountHigherThreshold(repairType)) + { + // TODO: Maybe just exclude assignment for this table? + // TODO: how should this effect the calculation of skippedTokenRanges? + logger.info("Too many SSTables for repair for table {}.{}, not doing repair on RepairAssignment {} " + + "totalSSTables {}", keyspaceName, tableName, repairAssignment, columnFamilyStore.getLiveSSTables().size()); + skippedTokenRanges += 1; + skipAssignment = true; + break; + } } - // this is done to make autorepair safe as running repair on table with more sstables - // may have its own challenges - int size = columnFamilyStore.getLiveSSTables().size(); - if (size > config.getRepairSSTableCountHigherThreshold(repairType)) + + if (skipAssignment) { - logger.info("Too many SSTables for repair, not doing repair on table {}.{} " + - "totalSSTables {}", keyspaceName, tableName, columnFamilyStore.getLiveSSTables().size()); - skippedTokenRanges += totalSubRanges; continue; } - if (config.getRepairByKeyspace(repairType)) + long tableStartTime = timeFunc.get(); + Set<Range<Token>> ranges = new HashSet<>(); + int totalProcessedAssignments = 0; + + if (!config.isAutoRepairEnabled(repairType)) { - logger.info("Repair keyspace {} for tables: {}", keyspaceName, tablesToBeRepaired); + logger.error("Auto-repair for type {} is disabled hence not running repair", repairType); + repairState.setRepairInProgress(false); + return; } - else + + if (AutoRepairUtils.keyspaceMaxRepairTimeExceeded(repairType, keyspaceStartTime, tablesToBeRepaired.size())) { - logger.info("Repair table {}.{}", keyspaceName, tableName); + skippedTokenRanges += totalRepairAssignments - totalProcessedAssignments; + logger.info("Keyspace took too much time to repair hence skipping it {}", + keyspaceName); + break; } - long tableStartTime = timeFunc.get(); - Set<Range<Token>> ranges = new HashSet<>(); - int totalProcessedSubRanges = 0; - for (Pair<Token, Token> token : subRangesToBeRepaired) + + // TODO: now that we are doing things assignment wise, how do reason about this? + /* + if (AutoRepairUtils.tableMaxRepairTimeExceeded(repairType, tableStartTime)) { - if (!config.isAutoRepairEnabled(repairType)) - { - logger.error("Auto-repair for type {} is disabled hence not running repair", repairType); - repairState.setRepairInProgress(false); - return; - } + skippedTokenRanges += totalSubRanges - totalProcessedAssignments; + logger.info("Table took too much time to repair hence skipping it {}.{}", + keyspaceName, tableName); + break; + }*/ - if (config.getRepairByKeyspace(repairType)) + Range<Token> tokenRange = repairAssignment.getTokenRange(); + logger.debug("Current Token Left side {}, right side {}", + tokenRange.left.toString(), + tokenRange.right.toString()); + + ranges.add(repairAssignment.getTokenRange()); + totalProcessedAssignments++; + if ((totalProcessedAssignments % config.getRepairThreads(repairType) == 0) || + (totalProcessedAssignments == totalRepairAssignments)) + { + int retryCount = 0; + Future<?> f = null; + while (retryCount <= config.getRepairMaxRetries()) { - if (AutoRepairUtils.keyspaceMaxRepairTimeExceeded(repairType, tableStartTime, tablesToBeRepaired.size())) + // TODO have getRepairRunnable accept a Set of tables instead of list. + RepairRunnable task = repairState.getRepairRunnable(keyspaceName, + Lists.newArrayList(repairAssignment.getTableNames()), + ranges, primaryRangeOnly); + repairState.resetWaitCondition(); + f = repairRunnableExecutors.get(repairType).submit(task); + try { - skippedTokenRanges += totalSubRanges - totalProcessedSubRanges; - logger.info("Keyspace took too much time to repair hence skipping it {}", - keyspaceName); - break; + repairState.waitForRepairToComplete(config.getRepairSessionTimeout(repairType)); } - } - else - { - if (AutoRepairUtils.tableMaxRepairTimeExceeded(repairType, tableStartTime)) + catch (InterruptedException e) { - skippedTokenRanges += totalSubRanges - totalProcessedSubRanges; - logger.info("Table took too much time to repair hence skipping it {}.{}", - keyspaceName, tableName); - break; + logger.error("Exception in cond await:", e); } - } - Token childStartToken = token.left; - Token childEndToken = token.right; - logger.debug("Current Token Left side {}, right side {}", childStartToken - .toString(), childEndToken.toString()); - - ranges.add(new Range<>(childStartToken, childEndToken)); - totalProcessedSubRanges++; - if ((totalProcessedSubRanges % config.getRepairThreads(repairType) == 0) || - (totalProcessedSubRanges == totalSubRanges)) - { - int retryCount = 0; - Future<?> f = null; - while (retryCount <= config.getRepairMaxRetries()) + if (repairState.isSuccess()) { - RepairRunnable task = repairState.getRepairRunnable(keyspaceName, - config.getRepairByKeyspace(repairType) ? tablesToBeRepaired : ImmutableList.of(tableName), - ranges, primaryRangeOnly); - repairState.resetWaitCondition(); - f = repairRunnableExecutors.get(repairType).submit(task); - try - { - repairState.waitForRepairToComplete(config.getRepairSessionTimeout(repairType)); - } - catch (InterruptedException e) - { - logger.error("Exception in cond await:", e); - } - if (repairState.isSuccess()) - { - break; - } - else if (retryCount < config.getRepairMaxRetries()) - { - boolean cancellationStatus = f.cancel(true); - logger.warn("Repair failed for range {}-{} for {}.{} with cancellationStatus: {} retrying after {} seconds...", - childStartToken, childEndToken, - keyspaceName, config.getRepairByKeyspace(repairType) ? tablesToBeRepaired : tableName, - cancellationStatus, config.getRepairRetryBackoff().toSeconds()); - sleepFunc.accept(config.getRepairRetryBackoff().toSeconds(), TimeUnit.SECONDS); - } - retryCount++; + break; } - //check repair status - if (repairState.isSuccess()) + else if (retryCount < config.getRepairMaxRetries()) { - logger.info("Repair completed for range {}-{} for {}.{}, total subranges: {}," + - "processed subranges: {}", childStartToken, childEndToken, - keyspaceName, config.getRepairByKeyspace(repairType) ? tablesToBeRepaired : tableName, totalSubRanges, totalProcessedSubRanges); - succeededTokenRanges += ranges.size(); + boolean cancellationStatus = f.cancel(true); + logger.warn("Repair failed for range {}-{} for {} tables {} with cancellationStatus: {} retrying after {} seconds...", + tokenRange.left, tokenRange.right, + keyspaceName, repairAssignment.getTableNames(), + cancellationStatus, config.getRepairRetryBackoff().toSeconds()); + sleepFunc.accept(config.getRepairRetryBackoff().toSeconds(), TimeUnit.SECONDS); } - else + retryCount++; + } + //check repair status + if (repairState.isSuccess()) + { + logger.info("Repair completed for range {}-{} for {} tables {}, total assignments: {}," + + "processed assignments: {}", tokenRange.left, tokenRange.right, + keyspaceName, repairAssignment.getTableNames(), totalRepairAssignments, totalProcessedAssignments); + succeededTokenRanges += ranges.size(); + } + else + { + boolean cancellationStatus = true; + if (f != null) { - boolean cancellationStatus = true; - if (f != null) - { - cancellationStatus = f.cancel(true); - } - //in the future we can add retry, etc. - logger.error("Repair failed for range {}-{} for {}.{} after {} retries, total subranges: {}," + - "processed subranges: {}, cancellationStatus: {}", childStartToken.toString(), childEndToken.toString(), keyspaceName, - config.getRepairByKeyspace(repairType) ? tablesToBeRepaired : tableName, retryCount, totalSubRanges, totalProcessedSubRanges, cancellationStatus); - failedTokenRanges += ranges.size(); + cancellationStatus = f.cancel(true); } - ranges.clear(); + //in the future we can add retry, etc. + logger.error("Repair failed for range {}-{} for {} tables {} after {} retries, total assignments: {}," + + "processed assignments: {}, cancellationStatus: {}", tokenRange.left, tokenRange.right, keyspaceName, + repairAssignment.getTableNames(), retryCount, totalRepairAssignments, totalProcessedAssignments, cancellationStatus); + failedTokenRanges += ranges.size(); } + ranges.clear(); } - if (config.getRepairByKeyspace(repairType)) - { - logger.info("Repair completed for keyspace {}, tables: {}", keyspaceName, tablesToBeRepaired); - break; - } - else - { - logger.info("Repair completed for {}.{}", keyspaceName, tableName); - } + // TODO: this format used a bit, maybe use RepairAssignment.toString instead + logger.info("Repair completed for {} tables {}, range {}", keyspaceName, repairAssignment.getTableNames(), repairAssignment.getTokenRange()); } catch (Exception e) { @@ -398,6 +394,14 @@ public class AutoRepair repairState.setTotalTablesConsideredForRepair(repairState.getTotalTablesConsideredForRepair() + 1); TableMetadata tableMetadata = iter.next(); String tableName = tableMetadata.name; + + ColumnFamilyStore columnFamilyStore = keyspace.getColumnFamilyStore(tableName); + if (!columnFamilyStore.metadata().params.automatedRepair.get(repairType).repairEnabled()) + { + logger.info("Repair is disabled for keyspace {} for tables: {}", keyspace.getName(), tableName); + repairState.setTotalDisabledTablesRepairCount(repairState.getTotalDisabledTablesRepairCount() + 1); + continue; + } tablesToBeRepaired.add(tableName); // See if we should repair MVs as well that are associated with this given table diff --git a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java index 97868a594f..7accc9ecd7 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java +++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java @@ -20,6 +20,7 @@ package org.apache.cassandra.repair.autorepair; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumMap; import java.util.HashSet; import java.util.Map; @@ -32,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.DurationSpec; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.exceptions.ConfigurationException; public class AutoRepairConfig implements Serializable @@ -59,7 +61,8 @@ public class AutoRepairConfig implements Serializable public enum RepairType { full, - incremental; + incremental, + preview_repaired; public static AutoRepairState getAutoRepairState(RepairType repairType) { @@ -69,6 +72,8 @@ public class AutoRepairConfig implements Serializable return new FullRepairState(); case incremental: return new IncrementalRepairState(); + case preview_repaired: + return new PreviewRepairedState(); } throw new IllegalArgumentException("Invalid repair type: " + repairType); @@ -280,7 +285,7 @@ public class AutoRepairConfig implements Serializable return applyOverrides(repairType, opt -> opt.force_repair_new_node); } - public String getTokenRangeSplitter(RepairType repairType) + public ParameterizedClass getTokenRangeSplitter(RepairType repairType) { return applyOverrides(repairType, opt -> opt.token_range_splitter); } @@ -340,7 +345,7 @@ public class AutoRepairConfig implements Serializable opts.force_repair_new_node = false; opts.table_max_repair_time = new DurationSpec.IntSecondsBound("6h"); opts.mv_repair_enabled = false; - opts.token_range_splitter = DefaultAutoRepairTokenSplitter.class.getName(); + opts.token_range_splitter = new ParameterizedClass(DefaultAutoRepairTokenSplitter.class.getName(), Collections.emptyMap()); opts.initial_scheduler_delay = new DurationSpec.IntSecondsBound("15m"); // 15 minutes opts.repair_session_timeout = new DurationSpec.IntSecondsBound("3h"); // 3 hours @@ -404,9 +409,9 @@ public class AutoRepairConfig implements Serializable // the default is 'true'. // This flag determines whether the auto-repair framework needs to run anti-entropy, a.k.a, repair on the MV table or not. public volatile Boolean mv_repair_enabled; - // the default is DefaultAutoRepairTokenSplitter.class.getName(). The class should implement IAutoRepairTokenRangeSplitter. + // the default is DefaultAutoRepairTokenSplitter. The class should implement IAutoRepairTokenRangeSplitter. // The default implementation splits the tokens based on the token ranges owned by this node divided by the number of 'number_of_subranges' - public volatile String token_range_splitter; + public volatile ParameterizedClass token_range_splitter; // the minimum delay after a node starts before the scheduler starts running repair public volatile DurationSpec.IntSecondsBound initial_scheduler_delay; // repair session timeout - this is applicable for each repair session diff --git a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java index a4869c7742..a9f9113750 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java +++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java @@ -285,6 +285,27 @@ public abstract class AutoRepairState implements ProgressListener } } +class PreviewRepairedState extends AutoRepairState +{ + public PreviewRepairedState() + { + super(RepairType.preview_repaired); + } + + + @Override + public RepairRunnable getRepairRunnable(String keyspace, List<String> tables, Set<Range<Token>> ranges, boolean primaryRangeOnly) + { + RepairOption option = new RepairOption(RepairParallelism.PARALLEL, primaryRangeOnly, false, false, + AutoRepairService.instance.getAutoRepairConfig().getRepairThreads(repairType), ranges, + !ranges.isEmpty(), false, false, PreviewKind.REPAIRED, false, true, false, false); + + option.getColumnFamilies().addAll(tables); + + return getRepairRunnable(keyspace, option); + } +} + class IncrementalRepairState extends AutoRepairState { public IncrementalRepairState() diff --git a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java index 9ef64c9bac..636a002507 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java +++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java @@ -32,6 +32,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.LocalStrategy; import org.slf4j.Logger; @@ -832,4 +834,33 @@ public class AutoRepairUtils } return allMvs; } + + // TODO: test + public static List<Range<Token>> splitEvenly(Range<Token> tokenRange, int numberOfSplits) + { + List<Range<Token>> splitRanges = new ArrayList<>(); + long left = (Long) tokenRange.left.getTokenValue(); + long right = (Long) tokenRange.right.getTokenValue(); + long repairTokenWidth = (right - left) / numberOfSplits; + for (int i = 0; i < numberOfSplits; i++) + { + long curLeft = left + (i * repairTokenWidth); + long curRight = curLeft + repairTokenWidth; + + if ((i + 1) == numberOfSplits) + { + curRight = right; + } + + Token childStartToken = StorageService.instance.getTokenMetadata() + .partitioner.getTokenFactory().fromString("" + curLeft); + Token childEndToken = StorageService.instance.getTokenMetadata() + .partitioner.getTokenFactory().fromString("" + curRight); + logger.debug("Current Token Left side {}, right side {}", childStartToken + .toString(), childEndToken.toString()); + Range<Token> splitRange = new Range<>(childStartToken, childEndToken); + splitRanges.add(splitRange); + } + return splitRanges; + } } diff --git a/src/java/org/apache/cassandra/repair/autorepair/DefaultAutoRepairTokenSplitter.java b/src/java/org/apache/cassandra/repair/autorepair/DefaultAutoRepairTokenSplitter.java index 7c254fd358..a58aede3af 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/DefaultAutoRepairTokenSplitter.java +++ b/src/java/org/apache/cassandra/repair/autorepair/DefaultAutoRepairTokenSplitter.java @@ -20,7 +20,10 @@ package org.apache.cassandra.repair.autorepair; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.cassandra.service.AutoRepairService; @@ -33,15 +36,18 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.repair.autorepair.AutoRepairUtils.splitEvenly; + public class DefaultAutoRepairTokenSplitter implements IAutoRepairTokenRangeSplitter { private static final Logger logger = LoggerFactory.getLogger(DefaultAutoRepairTokenSplitter.class); @Override - public List<Pair<Token, Token>> getRange(AutoRepairConfig.RepairType repairType, boolean primaryRangeOnly, String keyspaceName, String tableName) + public List<RepairAssignment> getRepairAssignments(AutoRepairConfig.RepairType repairType, boolean primaryRangeOnly, String keyspaceName, Set<String> tableNames) { - List<Pair<Token, Token>> range = new ArrayList<>(); + AutoRepairConfig config = AutoRepairService.instance.getAutoRepairConfig(); + List<RepairAssignment> repairAssignments = new ArrayList<>(); Collection<Range<Token>> tokens = StorageService.instance.getPrimaryRanges(keyspaceName); if (!primaryRangeOnly) @@ -49,41 +55,36 @@ public class DefaultAutoRepairTokenSplitter implements IAutoRepairTokenRangeSpli // if we need to repair non-primary token ranges, then change the tokens accrodingly tokens = StorageService.instance.getLocalReplicas(keyspaceName).ranges(); } - AutoRepairConfig config = AutoRepairService.instance.getAutoRepairConfig(); int numberOfSubranges = config.getRepairSubRangeNum(repairType); + + boolean byKeyspace = config.getRepairByKeyspace(repairType); + + // collect all token ranges. + List<Range<Token>> allRanges = new ArrayList<>(); for (Range<Token> token : tokens) { - Murmur3Partitioner.LongToken l = (Murmur3Partitioner.LongToken) (token.left); - Murmur3Partitioner.LongToken r = (Murmur3Partitioner.LongToken) (token.right); - Token parentStartToken = StorageService.instance.getTokenMetadata() - .partitioner.getTokenFactory().fromString("" + l.getTokenValue()); - Token parentEndToken = StorageService.instance.getTokenMetadata() - .partitioner.getTokenFactory().fromString("" + r.getTokenValue()); - logger.debug("Parent Token Left side {}, right side {}", parentStartToken.toString(), - parentEndToken.toString()); + allRanges.addAll(splitEvenly(token, numberOfSubranges)); + } - long left = (Long) l.getTokenValue(); - long right = (Long) r.getTokenValue(); - long repairTokenWidth = (right - left) / numberOfSubranges; - for (int i = 0; i < numberOfSubranges; i++) + if (byKeyspace) + { + for (Range<Token> splitRange : allRanges) { - long curLeft = left + (i * repairTokenWidth); - long curRight = curLeft + repairTokenWidth; - - if ((i + 1) == numberOfSubranges) + // add repair assignment for each range entire keyspace's tables + repairAssignments.add(new RepairAssignment(splitRange, keyspaceName, tableNames)); + } + } + else + { + // add repair assignment per table + for (String tableName : tableNames) + { + for (Range<Token> splitRange : allRanges) { - curRight = right; + repairAssignments.add(new RepairAssignment(splitRange, keyspaceName, Collections.singleton(tableName))); } - - Token childStartToken = StorageService.instance.getTokenMetadata() - .partitioner.getTokenFactory().fromString("" + curLeft); - Token childEndToken = StorageService.instance.getTokenMetadata() - .partitioner.getTokenFactory().fromString("" + curRight); - logger.debug("Current Token Left side {}, right side {}", childStartToken - .toString(), childEndToken.toString()); - range.add(Pair.create(childStartToken, childEndToken)); } } - return range; + return repairAssignments; } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/repair/autorepair/IAutoRepairTokenRangeSplitter.java b/src/java/org/apache/cassandra/repair/autorepair/IAutoRepairTokenRangeSplitter.java index 2b69898a36..fd63769431 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/IAutoRepairTokenRangeSplitter.java +++ b/src/java/org/apache/cassandra/repair/autorepair/IAutoRepairTokenRangeSplitter.java @@ -19,13 +19,83 @@ package org.apache.cassandra.repair.autorepair; import java.util.List; +import java.util.Objects; +import java.util.Set; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.utils.Pair; public interface IAutoRepairTokenRangeSplitter { - // split the token range you wish to repair into multiple subranges - // the autorepair framework will repair the list of returned subrange in a sequence - List<Pair<Token, Token>> getRange(AutoRepairConfig.RepairType repairType, boolean primaryRangeOnly, String keyspaceName, String tableName); + + /** + * Split the token range you wish to repair into multiple assignments. + * The autorepair framework will repair the list of returned subrange in a sequence. + * @param repairType The type of repair being executed + * @param primaryRangeOnly Whether to repair only this node's primary ranges or all of its ranges. + * @param keyspaceName The keyspace being repaired + * @param tableNames The tables to repair + * @return repair assignments broken up by range, keyspace and tables. + */ + List<RepairAssignment> getRepairAssignments(AutoRepairConfig.RepairType repairType, boolean primaryRangeOnly, String keyspaceName, Set<String> tableNames); + + /** + * Defines a repair assignment to be issued by the autorepair framework. + */ + class RepairAssignment + { + final Range<Token> tokenRange; + + final String keyspaceName; + + final Set<String> tableNames; + + public RepairAssignment(Range<Token> tokenRange, String keyspaceName, Set<String> tableNames) + { + this.tokenRange = tokenRange; + this.keyspaceName = keyspaceName; + this.tableNames = tableNames; + } + + public Range<Token> getTokenRange() + { + return tokenRange; + } + + public String getKeyspaceName() + { + return keyspaceName; + } + + public Set<String> getTableNames() + { + return tableNames; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RepairAssignment that = (RepairAssignment) o; + return Objects.equals(tokenRange, that.tokenRange) && Objects.equals(keyspaceName, that.keyspaceName) && Objects.equals(tableNames, that.tableNames); + } + + @Override + public int hashCode() + { + return Objects.hash(tokenRange, keyspaceName, tableNames); + } + + @Override + public String toString() + { + return "RepairAssignment{" + + "tokenRange=" + tokenRange + + ", keyspaceName='" + keyspaceName + '\'' + + ", tableNames=" + tableNames + + '}'; + } + } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/repair/autorepair/UnrepairedBytesBasedTokenRangeSplitter.java b/src/java/org/apache/cassandra/repair/autorepair/UnrepairedBytesBasedTokenRangeSplitter.java new file mode 100644 index 0000000000..1e65d323d9 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/autorepair/UnrepairedBytesBasedTokenRangeSplitter.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.autorepair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.big.BigTableScanner; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.concurrent.Refs; + +import static org.apache.cassandra.repair.autorepair.AutoRepairUtils.splitEvenly; + +public class UnrepairedBytesBasedTokenRangeSplitter implements IAutoRepairTokenRangeSplitter +{ + private static final Logger logger = LoggerFactory.getLogger(UnrepairedBytesBasedTokenRangeSplitter.class); + + static final String SUBRANGE_SIZE = "subrange_size"; + static final String MAX_BYTES_PER_SCHEDULE = "max_bytes_per_schedule"; + + // target bytes per subrange + private final DataStorageSpec.LongBytesBound subrangeSize; + + // maximum target bytes to repair + private final DataStorageSpec.LongBytesBound maxBytesPerSchedule; + + private final long subrangeBytes; + + private final long maxBytesPerScheduleBytes; + + private static final DataStorageSpec.LongBytesBound DEFAULT_SUBRANGE_SIZE = new DataStorageSpec.LongBytesBound("100GiB"); + private static final DataStorageSpec.LongBytesBound DEFAULT_MAX_BYTES_PER_SCHEDULE = new DataStorageSpec.LongBytesBound("500GiB"); + + public UnrepairedBytesBasedTokenRangeSplitter(Map<String, String> parameters) + { + // Demonstrates parameterizing a range splitter so we can have splitter specific options. + if (parameters.containsKey(SUBRANGE_SIZE)) + { + subrangeSize = new DataStorageSpec.LongBytesBound(parameters.get(SUBRANGE_SIZE)); + } + else + { + subrangeSize = DEFAULT_SUBRANGE_SIZE; + } + subrangeBytes = subrangeSize.toBytes(); + + if (parameters.containsKey(MAX_BYTES_PER_SCHEDULE)) + { + maxBytesPerSchedule = new DataStorageSpec.LongBytesBound(parameters.get(MAX_BYTES_PER_SCHEDULE)); + } + else + { + maxBytesPerSchedule = DEFAULT_MAX_BYTES_PER_SCHEDULE; + } + maxBytesPerScheduleBytes = maxBytesPerSchedule.toBytes(); + + logger.info("Configured {} with {}={}, {}={}", UnrepairedBytesBasedTokenRangeSplitter.class.getName(), + SUBRANGE_SIZE, subrangeSize, MAX_BYTES_PER_SCHEDULE, maxBytesPerSchedule); + } + + @Override + public List<RepairAssignment> getRepairAssignments(AutoRepairConfig.RepairType repairType, boolean primaryRangeOnly, String keyspaceName, Set<String> tableNames) + { + List<RepairAssignment> repairAssignments = new ArrayList<>(); + + logger.info("Calculating token range splits for repairType={} primaryRangeOnly={} keyspaceName={} tableNames={}", repairType, primaryRangeOnly, keyspaceName, tableNames); + if (repairType != AutoRepairConfig.RepairType.incremental) + { + throw new IllegalArgumentException(this.getClass().getName() + " only supports " + AutoRepairConfig.RepairType.incremental + " repair"); + } + + // TODO: create a custom repair assignment that indicates number of bytes in repair and join tables by byte size. + Collection<Range<Token>> tokenRanges = getTokenRanges(primaryRangeOnly, keyspaceName); + for (String tableName : tableNames) + { + repairAssignments.addAll(getRepairAssignmentsForTable(keyspaceName, tableName, tokenRanges)); + } + return repairAssignments; + } + + public List<RepairAssignment> getRepairAssignmentsForTable(String keyspaceName, String tableName, Collection<Range<Token>> tokenRanges) + { + List<RepairAssignment> repairAssignments = new ArrayList<>(); + + long targetBytesSoFar = 0; + + for (Range<Token> tokenRange : tokenRanges) + { + logger.info("Calculating unrepaired bytes for {}.{} for range {}", keyspaceName, tableName, tokenRange); + // Capture the amount of unrepaired bytes for range + long approximateUnrepairedBytesForRange = 0L; + // Capture the total bytes in read sstables, this will be useful for calculating the ratio + // of data in SSTables including this range and also useful to know how much anticompaction there will be. + long totalBytesInUnrepairedSSTables = 0L; + try (Refs<SSTableReader> refs = getSSTableReaderRefs(keyspaceName, tableName, tokenRange)) + { + for (SSTableReader reader : refs) + { + // Only evaluate unrepaired SSTables. + if (!reader.isRepaired()) + { + long sstableSize = reader.bytesOnDisk(); + totalBytesInUnrepairedSSTables += sstableSize; + // get the bounds of the sstable for this range using the index file but do not actually read it. + List<AbstractBounds<PartitionPosition>> bounds = BigTableScanner.makeBounds(reader, Collections.singleton(tokenRange)); + try (BigTableScanner scanner = (BigTableScanner) BigTableScanner.getScanner(reader, Collections.singleton(tokenRange))) + { + assert bounds.size() == 1; + + AbstractBounds<PartitionPosition> bound = bounds.get(0); + long startPosition = scanner.getDataPosition(bound.left); + long endPosition = scanner.getDataPosition(bound.right); + // If end position is 0 we can assume the sstable ended before that token, bound at size of file + if (endPosition == 0) + { + endPosition = sstableSize; + } + + long approximateRangeBytesInSSTable = Math.max(0, endPosition - startPosition); + // get the fraction of the sstable belonging to the range. + approximateUnrepairedBytesForRange += Math.min(approximateRangeBytesInSSTable, sstableSize); + double ratio = approximateRangeBytesInSSTable / (double) sstableSize; + logger.info("Calculations for {}.{} {}: sstableSize={}, rangeBytesInSSTable={}, startPosition={}, endPosition={}, ratio={}", + keyspaceName, tableName, reader.descriptor.baseFilename(), + FileUtils.stringifyFileSize(sstableSize), FileUtils.stringifyFileSize(approximateRangeBytesInSSTable), startPosition, endPosition, ratio); + } + } + else + { + logger.info("Skipping over {}.{} {} ({}) because it is repaired", keyspaceName, tableName, reader.descriptor.baseFilename(), FileUtils.stringifyFileSize(reader.bytesOnDisk())); + } + } + } + + // Only consider token range if it had unrepaired sstables or live data in memtables. + if (totalBytesInUnrepairedSSTables > 0L) + { + // TODO: Possibly some anticompaction configuration we want here, where if we detect a large amount of anticompaction we want to reduce the work we do. + double ratio = approximateUnrepairedBytesForRange / (double) totalBytesInUnrepairedSSTables; + logger.info("Calculated unrepaired bytes for {}.{} for range {}: sstableSize={}, rangeBytesInSSTables={}, ratio={}", keyspaceName, tableName, tokenRange, + FileUtils.stringifyFileSize(totalBytesInUnrepairedSSTables), FileUtils.stringifyFileSize(approximateUnrepairedBytesForRange), ratio); + + // TODO: split on byte size here, this is currently a bit naive in assuming that data is evenly distributed among the range which may not be the + // right assumption. May want to consider when splitting on these ranges to reevaluate how much data is in the range, but for this + // exists as a demonstration. + if (approximateUnrepairedBytesForRange < subrangeBytes) + { + // accept range as is if less than bytes. + logger.info("Using 1 repair assignment for {}.{} for range {} as {} is less than {}", keyspaceName, tableName, tokenRange, + FileUtils.stringifyFileSize(approximateUnrepairedBytesForRange), subrangeSize); + // TODO: this is a bit repetitive see if can reduce more. + RepairAssignment assignment = new BytesBasedRepairAssignment(tokenRange, keyspaceName, Collections.singleton(tableName), approximateUnrepairedBytesForRange); + if (!canAddAssignment(assignment, targetBytesSoFar, approximateUnrepairedBytesForRange)) + { + return repairAssignments; + } + repairAssignments.add(assignment); + targetBytesSoFar += approximateUnrepairedBytesForRange; + } + else + { + long targetRanges = approximateUnrepairedBytesForRange / subrangeBytes; + // TODO: approximation per range, this is a bit lossy since targetRanges rounds down. + long approximateBytesPerSplit = approximateUnrepairedBytesForRange / targetRanges; + logger.info("Splitting {}.{} for range {} into {} sub ranges, approximateBytesPerSplit={}", keyspaceName, tableName, tokenRange, targetRanges, approximateBytesPerSplit); + List<Range<Token>> splitRanges = splitEvenly(tokenRange, (int) targetRanges); + for (Range<Token> splitRange : splitRanges) + { + RepairAssignment assignment = new BytesBasedRepairAssignment(splitRange, keyspaceName, Collections.singleton(tableName), approximateBytesPerSplit); + if (!canAddAssignment(assignment, targetBytesSoFar, approximateBytesPerSplit)) + { + return repairAssignments; + } + repairAssignments.add(assignment); + targetBytesSoFar += approximateBytesPerSplit; + } + } + } + else + { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspaceName, tableName); + long memtableSize = cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize(); + if (memtableSize > 0L) + { + logger.info("Included {}.{} range {}, had no unrepaired SSTables, but memtableSize={}, adding single repair assignment", keyspaceName, tableName, tokenRange, memtableSize); + RepairAssignment assignment = new BytesBasedRepairAssignment(tokenRange, keyspaceName, Collections.singleton(tableName), memtableSize); + if (targetBytesSoFar >= maxBytesPerScheduleBytes) + { + return repairAssignments; + } + repairAssignments.add(assignment); + targetBytesSoFar += memtableSize; + } + else + { + logger.info("Skipping {}.{} for range {} because it had no unrepaired SSTables and no memtable data", keyspaceName, tableName, tokenRange); + } + } + } + return repairAssignments; + } + + private boolean canAddAssignment(RepairAssignment repairAssignment, long targetBytesSoFar, long bytesToBeAdded) + { + if (targetBytesSoFar + bytesToBeAdded < maxBytesPerScheduleBytes) + { + return true; + } + logger.warn("Refusing to add {} with a target size of {} because it would increase total repair bytes to {} which is greater than {}={}", + repairAssignment, FileUtils.stringifyFileSize(bytesToBeAdded), FileUtils.stringifyFileSize(targetBytesSoFar + bytesToBeAdded), MAX_BYTES_PER_SCHEDULE, maxBytesPerSchedule); + return false; + } + + public Collection<Range<Token>> getTokenRanges(boolean primaryRangeOnly, String keyspaceName) + { + // Collect all applicable token ranges + Collection<Range<Token>> wrappedRanges; + if (primaryRangeOnly) + { + wrappedRanges = StorageService.instance.getPrimaryRanges(keyspaceName); + } + else + { + wrappedRanges = StorageService.instance.getLocalRanges(keyspaceName); + } + + // Unwrap each range as we need to account for ranges that overlap the ring + Collection<Range<Token>> ranges = new ArrayList<>(); + for (Range<Token> wrappedRange : wrappedRanges) + { + ranges.addAll(wrappedRange.unwrap()); + } + + return ranges; + } + + public Refs<SSTableReader> getSSTableReaderRefs(String keyspaceName, String tableName, Range<Token> tokenRange) + { + final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspaceName, tableName); + + if (cfs == null) + { + throw new IllegalArgumentException(String.format("Could not resolve ColumnFamilyStore from %s.%s", keyspaceName, tableName)); + } + + Iterable<SSTableReader> sstables = cfs.getTracker().getView().select(SSTableSet.CANONICAL); + SSTableIntervalTree tree = SSTableIntervalTree.build(sstables); + Range<PartitionPosition> r = Range.makeRowRange(tokenRange); + Iterable<SSTableReader> canonicalSSTables = View.sstablesInBounds(r.left, r.right, tree); + + // TODO: may need to reason about this not working. + return Refs.ref(canonicalSSTables); + } + + public static class BytesBasedRepairAssignment extends RepairAssignment + { + private long approximateBytes; + + public BytesBasedRepairAssignment(Range<Token> tokenRange, String keyspaceName, Set<String> tableNames, long approximateBytes) + { + super(tokenRange, keyspaceName, tableNames); + this.approximateBytes = approximateBytes; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + BytesBasedRepairAssignment that = (BytesBasedRepairAssignment) o; + return approximateBytes == that.approximateBytes; + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), approximateBytes); + } + + @Override + public String toString() + { + return "BytesBasedRepairAssignment{" + + "keyspaceName='" + keyspaceName + '\'' + + ", approximateBytes=" + approximateBytes + + ", tokenRange=" + tokenRange + + ", tableNames=" + tableNames + + '}'; + } + + public long getApproximateBytes() + { + return approximateBytes; + } + } +} diff --git a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java index d63bbace79..f6eab7364c 100644 --- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java @@ -406,4 +406,4 @@ public final class SystemDistributedKeyspace { UNKNOWN, STARTED, SUCCESS } -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index d3764778da..c9b8233c6f 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -50,6 +50,7 @@ import com.google.common.base.Suppliers; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileInputStreamPlus; import org.apache.cassandra.io.util.FileOutputStreamPlus; @@ -686,11 +687,31 @@ public class FBUtilities } } - public static IAutoRepairTokenRangeSplitter newAutoRepairTokenRangeSplitter(String className) throws ConfigurationException + public static IAutoRepairTokenRangeSplitter newAutoRepairTokenRangeSplitter(ParameterizedClass parameterizedClass) throws ConfigurationException { - if (!className.contains(".")) - className = "org.apache.cassandra.repair.autorepair." + className; - return FBUtilities.construct(className, "auto repair token splitter"); + String className = parameterizedClass.class_name.contains(".") ? + parameterizedClass.class_name : + "org.apache.cassandra.repair.autorepair." + parameterizedClass.class_name; + + try + { + Class<?> tokenRangeSplitterClass = Class.forName(className); + try + { + Map<String, String> parameters = parameterizedClass.parameters != null ? parameterizedClass.parameters : Collections.emptyMap(); + // first attempt to initialize with Map arguments. + return (IAutoRepairTokenRangeSplitter) tokenRangeSplitterClass.getConstructor(Map.class).newInstance(parameters); + } + catch (NoSuchMethodException nsme) + { + // fall back on no argument constructor. + return (IAutoRepairTokenRangeSplitter) tokenRangeSplitterClass.getConstructor().newInstance(); + } + } + catch (Exception ex) + { + throw new ConfigurationException("Unable to create instance of IAutoRepairTokenRangeSplitter for " + className, ex); + } } /** diff --git a/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairConfigTest.java b/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairConfigTest.java index 07eb9070fe..1cff3569a8 100644 --- a/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairConfigTest.java +++ b/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairConfigTest.java @@ -20,6 +20,10 @@ package org.apache.cassandra.repair.autorepair; import java.util.EnumMap; import java.util.Objects; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import com.google.common.collect.ImmutableSet; @@ -30,7 +34,9 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.config.DurationSpec; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.repair.autorepair.AutoRepairConfig.Options; @@ -440,14 +446,24 @@ public class AutoRepairConfigTest extends CQLTester { Options defaultOptions = Options.getDefaultOptions(); - assertEquals(DefaultAutoRepairTokenSplitter.class.getName(),defaultOptions.token_range_splitter); + ParameterizedClass expectedDefault = new ParameterizedClass(DefaultAutoRepairTokenSplitter.class.getName(), Collections.emptyMap()); + + assertEquals(expectedDefault, defaultOptions.token_range_splitter); assertEquals(DefaultAutoRepairTokenSplitter.class.getName(), FBUtilities.newAutoRepairTokenRangeSplitter(defaultOptions.token_range_splitter).getClass().getName()); } + @Test + public void testTokenRangeSplitterAcceptingMap() + { + Map<String, String> splitterOptions = new HashMap<>(); + splitterOptions.put("unrepaired_bytes_per_split", "1MiB"); + config.global_settings.token_range_splitter = new ParameterizedClass(UnrepairedBytesBasedTokenRangeSplitter.class.getName(), splitterOptions); + } + @Test(expected = ConfigurationException.class) public void testInvalidTokenRangeSplitter() { - assertEquals(DefaultAutoRepairTokenSplitter.class.getName(), FBUtilities.newAutoRepairTokenRangeSplitter("invalid-class").getClass().getName()); + FBUtilities.newAutoRepairTokenRangeSplitter(new ParameterizedClass("invalid-class", Collections.emptyMap())); } @Test diff --git a/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairParameterizedTest.java b/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairParameterizedTest.java index 1604055e4b..96b92e10ed 100644 --- a/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairParameterizedTest.java +++ b/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairParameterizedTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.repair.autorepair; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -36,6 +37,7 @@ import org.apache.cassandra.cql3.statements.schema.TableAttributes; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.repair.RepairRunnable; +import org.apache.cassandra.repair.autorepair.IAutoRepairTokenRangeSplitter.RepairAssignment; import org.apache.cassandra.schema.AutoRepairParams; import org.apache.cassandra.schema.TableParams; import org.apache.cassandra.service.StorageService; @@ -540,13 +542,12 @@ public class AutoRepairParameterizedTest extends CQLTester { Collection<Range<Token>> tokens = StorageService.instance.getPrimaryRanges(KEYSPACE); assertEquals(1, tokens.size()); - List<Range<Token>> expectedToken = new ArrayList<>(); - expectedToken.addAll(tokens); + List<Range<Token>> expectedToken = new ArrayList<>(tokens); - List<Pair<Token, Token>> ranges = new DefaultAutoRepairTokenSplitter().getRange(repairType, true, KEYSPACE, TABLE); - assertEquals(1, ranges.size()); - assertEquals(expectedToken.get(0).left, ranges.get(0).left); - assertEquals(expectedToken.get(0).right, ranges.get(0).right); + List<RepairAssignment> assignments = new DefaultAutoRepairTokenSplitter().getRepairAssignments(repairType, true, KEYSPACE, Collections.singleton(TABLE)); + assertEquals(1, assignments.size()); + assertEquals(expectedToken.get(0).left, assignments.get(0).getTokenRange().left); + assertEquals(expectedToken.get(0).right, assignments.get(0).getTokenRange().right); } @Test @@ -554,13 +555,13 @@ public class AutoRepairParameterizedTest extends CQLTester { Collection<Range<Token>> tokens = StorageService.instance.getPrimaryRanges(KEYSPACE); assertEquals(1, tokens.size()); - List<Range<Token>> expectedToken = new ArrayList<>(); - expectedToken.addAll(tokens); + // TODO: validate the tokens + List<Range<Token>> expectedToken = new ArrayList<>(tokens); AutoRepairConfig config = AutoRepairService.instance.getAutoRepairConfig(); config.setRepairSubRangeNum(repairType, 4); - List<Pair<Token, Token>> ranges = new DefaultAutoRepairTokenSplitter().getRange(repairType, true, KEYSPACE, TABLE); - assertEquals(4, ranges.size()); + List<RepairAssignment> assignments = new DefaultAutoRepairTokenSplitter().getRepairAssignments(repairType, true, KEYSPACE, Collections.singleton(TABLE)); + assertEquals(4, assignments.size()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
