This is an automated email from the ASF dual-hosted git repository.

tolbertam pushed a commit to branch auto_repair_v2_on_4_1_splitter_changes
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 443148eeadceaf8b2881b46fb7ae0b983e0036bf
Author: Andy Tolbert <[email protected]>
AuthorDate: Wed Sep 25 18:05:30 2024 -0500

    Update IAutoRepairTokenRangeSplitter API to be more flexible
    
    Updates IAutoRepairTokenRangeSplitter API to take in a set of tables
    instead of a single table and return "RepairAssignments".
    
    This allows the API more control over how the repairs are issued by
    the AutoRepair framework, by allowing grouping of repairs by specific
    tables or a single repair at the keyspace level.
    
    Also updates IAutoRepairTokenRangeSplitter to be a ParameterizedClass,
    so custom splitters can have their own configuration.
    
    Update AutoRepair to account for these changes.
    
    Update DefaultAutoRepairTokenSplitter to account for the change, and
    to also evaluate on getRepairByKeyspace option.  This also improves
    the filtering of tables that should not be repaired where previously
    getRepairByKeyspace would not factor this in.
    
    Move logic to split token ranges evenly into AutoRepairUtils so it
    can be reutilized by other splitters.
    
    Adds prototype UnrepairedBytesBasedTokenRange that demonstrates
    consuming splitter-based properties to limit the repair assignments
    returned to be bound.
---
 .../cassandra/config/ParameterizedClass.java       |   3 +-
 .../io/sstable/format/big/BigTableScanner.java     |  44 ++-
 .../cassandra/repair/autorepair/AutoRepair.java    | 234 ++++++++-------
 .../repair/autorepair/AutoRepairConfig.java        |  15 +-
 .../repair/autorepair/AutoRepairState.java         |  21 ++
 .../repair/autorepair/AutoRepairUtils.java         |  31 ++
 .../autorepair/DefaultAutoRepairTokenSplitter.java |  59 ++--
 .../autorepair/IAutoRepairTokenRangeSplitter.java  |  76 ++++-
 .../UnrepairedBytesBasedTokenRangeSplitter.java    | 332 +++++++++++++++++++++
 .../schema/SystemDistributedKeyspace.java          |   2 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |  29 +-
 .../repair/autorepair/AutoRepairConfigTest.java    |  20 +-
 .../autorepair/AutoRepairParameterizedTest.java    |  21 +-
 13 files changed, 716 insertions(+), 171 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java 
b/src/java/org/apache/cassandra/config/ParameterizedClass.java
index 9b001786e3..a101caa92c 100644
--- a/src/java/org/apache/cassandra/config/ParameterizedClass.java
+++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.config;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -27,7 +28,7 @@ import org.apache.cassandra.utils.Shared;
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 @Shared(scope = SIMULATION)
-public class ParameterizedClass
+public class ParameterizedClass implements Serializable
 {
     public static final String CLASS_NAME = "class_name";
     public static final String PARAMETERS = "parameters";
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 235b9b1a71..c1396534a6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -117,7 +117,7 @@ public class BigTableScanner implements ISSTableScanner
         this.listener = listener;
     }
 
-    private static List<AbstractBounds<PartitionPosition>> 
makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
+    public static List<AbstractBounds<PartitionPosition>> 
makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
     {
         List<AbstractBounds<PartitionPosition>> boundsList = new 
ArrayList<>(tokenRanges.size());
         for (Range<Token> range : Range.normalize(tokenRanges))
@@ -209,6 +209,48 @@ public class BigTableScanner implements ISSTableScanner
         }
     }
 
+    /**
+     * Gets the position in the data file, but does not seek to it.  This does 
seek the index to find the data position
+     * but does not actually seek the data file.
+     * @param position position to find in data file.
+     * @return
+     * @throws CorruptSSTableException
+     */
+    public long getDataPosition(PartitionPosition position) throws 
CorruptSSTableException {
+
+        long indexPosition = sstable.getIndexScanPosition(position);
+        ifile.seek(indexPosition);
+        try
+        {
+
+            while (!ifile.isEOF())
+            {
+                indexPosition = ifile.getFilePointer();
+                DecoratedKey indexDecoratedKey = 
sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+                if (indexDecoratedKey.compareTo(position) > 0)
+                {
+                    // Found, just read the dataPosition and seek into index 
and data files
+                    long dataPosition = 
RowIndexEntry.Serializer.readPosition(ifile);
+                    // seek the index file position as we will presumably seek 
further when goign to the next position.
+                    ifile.seek(indexPosition);
+                    return dataPosition;
+                }
+                else
+                {
+                    RowIndexEntry.Serializer.skip(ifile, 
sstable.descriptor.version);
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, sstable.getFilename());
+        }
+        // If for whatever reason we don't find position in file, just return 0
+        return 0L;
+    }
+
+
     public void close()
     {
         try
diff --git a/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java 
b/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java
index 69db9e609a..67dedd8256 100644
--- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java
+++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,9 +34,11 @@ import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.repair.RepairRunnable;
+import 
org.apache.cassandra.repair.autorepair.IAutoRepairTokenRangeSplitter.RepairAssignment;
 import org.apache.cassandra.utils.Pair;
 
 import org.slf4j.Logger;
@@ -203,149 +206,142 @@ public class AutoRepair
                     }
 
                     
repairState.setRepairKeyspaceCount(repairState.getRepairKeyspaceCount() + 1);
-                    List<String> tablesToBeRepaired = 
retrieveTablesToBeRepaired(keyspace, repairType, repairState);
-                    shuffleFunc.accept(tablesToBeRepaired);
-                    for (String tableName : tablesToBeRepaired)
+                    List<String> tablesToBeRepairedList = 
retrieveTablesToBeRepaired(keyspace, repairType, repairState);
+                    // TODO: Should we shuffle repair assignments instead of 
tables?  Side effect of that is will
+                    // shuffle the ranges of the table being repaired as well.
+                    shuffleFunc.accept(tablesToBeRepairedList);
+                    Set<String> tablesToBeRepaired = new 
LinkedHashSet<>(tablesToBeRepairedList);
+
+                    String keyspaceName = keyspace.getName();
+                    List<RepairAssignment> repairAssignments = 
tokenRangeSplitters.get(repairType).getRepairAssignments(repairType, 
primaryRangeOnly, keyspaceName, tablesToBeRepaired);
+
+                    int totalRepairAssignments = repairAssignments.size();
+                    long keyspaceStartTime = timeFunc.get();
+                    for (RepairAssignment repairAssignment : repairAssignments)
                     {
-                        String keyspaceName = keyspace.getName();
                         try
                         {
-                            List<Pair<Token, Token>> subRangesToBeRepaired = 
tokenRangeSplitters.get(repairType).getRange(repairType, primaryRangeOnly, 
keyspaceName, tableName);
-                            int totalSubRanges = subRangesToBeRepaired.size();
-
-                            ColumnFamilyStore columnFamilyStore = 
keyspace.getColumnFamilyStore(tableName);
-                            if 
(!columnFamilyStore.metadata().params.automatedRepair.get(repairType).repairEnabled())
+                            boolean skipAssignment = false;
+                            for (String tableName : 
repairAssignment.getTableNames())
                             {
-                                logger.info("Repair is disabled for keyspace 
{} for tables: {}", keyspaceName, tableName);
-                                
repairState.setTotalDisabledTablesRepairCount(repairState.getTotalDisabledTablesRepairCount()
 + 1);
-                                continue;
+                                ColumnFamilyStore columnFamilyStore = 
keyspace.getColumnFamilyStore(tableName);
+                                // this is done to make autorepair safe as 
running repair on table with more sstables
+                                // may have its own challenges
+                                int size = 
columnFamilyStore.getLiveSSTables().size();
+                                if (size > 
config.getRepairSSTableCountHigherThreshold(repairType))
+                                {
+                                    // TODO: Maybe just exclude assignment for 
this table?
+                                    // TODO: how should this effect the 
calculation of skippedTokenRanges?
+                                    logger.info("Too many SSTables for repair 
for table {}.{}, not doing repair on RepairAssignment {} " +
+                                                "totalSSTables {}", 
keyspaceName, tableName, repairAssignment, 
columnFamilyStore.getLiveSSTables().size());
+                                    skippedTokenRanges += 1;
+                                    skipAssignment = true;
+                                    break;
+                                }
                             }
-                            // this is done to make autorepair safe as running 
repair on table with more sstables
-                            // may have its own challenges
-                            int size = 
columnFamilyStore.getLiveSSTables().size();
-                            if (size > 
config.getRepairSSTableCountHigherThreshold(repairType))
+
+                            if (skipAssignment)
                             {
-                                logger.info("Too many SSTables for repair, not 
doing repair on table {}.{} " +
-                                            "totalSSTables {}", keyspaceName, 
tableName, columnFamilyStore.getLiveSSTables().size());
-                                skippedTokenRanges += totalSubRanges;
                                 continue;
                             }
 
-                            if (config.getRepairByKeyspace(repairType))
+                            long tableStartTime = timeFunc.get();
+                            Set<Range<Token>> ranges = new HashSet<>();
+                            int totalProcessedAssignments = 0;
+
+                            if (!config.isAutoRepairEnabled(repairType))
                             {
-                                logger.info("Repair keyspace {} for tables: 
{}", keyspaceName, tablesToBeRepaired);
+                                logger.error("Auto-repair for type {} is 
disabled hence not running repair", repairType);
+                                repairState.setRepairInProgress(false);
+                                return;
                             }
-                            else
+
+                            if 
(AutoRepairUtils.keyspaceMaxRepairTimeExceeded(repairType, keyspaceStartTime, 
tablesToBeRepaired.size()))
                             {
-                                logger.info("Repair table {}.{}", 
keyspaceName, tableName);
+                                skippedTokenRanges += totalRepairAssignments - 
totalProcessedAssignments;
+                                logger.info("Keyspace took too much time to 
repair hence skipping it {}",
+                                            keyspaceName);
+                                break;
                             }
-                            long tableStartTime = timeFunc.get();
-                            Set<Range<Token>> ranges = new HashSet<>();
-                            int totalProcessedSubRanges = 0;
-                            for (Pair<Token, Token> token : 
subRangesToBeRepaired)
+
+                            // TODO: now that we are doing things assignment 
wise, how do reason about this?
+                            /*
+                            if 
(AutoRepairUtils.tableMaxRepairTimeExceeded(repairType, tableStartTime))
                             {
-                                if (!config.isAutoRepairEnabled(repairType))
-                                {
-                                    logger.error("Auto-repair for type {} is 
disabled hence not running repair", repairType);
-                                    repairState.setRepairInProgress(false);
-                                    return;
-                                }
+                                skippedTokenRanges += totalSubRanges - 
totalProcessedAssignments;
+                                logger.info("Table took too much time to 
repair hence skipping it {}.{}",
+                                            keyspaceName, tableName);
+                                break;
+                            }*/
 
-                                if (config.getRepairByKeyspace(repairType))
+                            Range<Token> tokenRange = 
repairAssignment.getTokenRange();
+                            logger.debug("Current Token Left side {}, right 
side {}",
+                                         tokenRange.left.toString(),
+                                         tokenRange.right.toString());
+
+                            ranges.add(repairAssignment.getTokenRange());
+                            totalProcessedAssignments++;
+                            if ((totalProcessedAssignments % 
config.getRepairThreads(repairType) == 0) ||
+                                (totalProcessedAssignments == 
totalRepairAssignments))
+                            {
+                                int retryCount = 0;
+                                Future<?> f = null;
+                                while (retryCount <= 
config.getRepairMaxRetries())
                                 {
-                                    if 
(AutoRepairUtils.keyspaceMaxRepairTimeExceeded(repairType, tableStartTime, 
tablesToBeRepaired.size()))
+                                    // TODO have getRepairRunnable accept a 
Set of tables instead of list.
+                                    RepairRunnable task = 
repairState.getRepairRunnable(keyspaceName,
+                                                                               
         Lists.newArrayList(repairAssignment.getTableNames()),
+                                                                               
         ranges, primaryRangeOnly);
+                                    repairState.resetWaitCondition();
+                                    f = 
repairRunnableExecutors.get(repairType).submit(task);
+                                    try
                                     {
-                                        skippedTokenRanges += totalSubRanges - 
totalProcessedSubRanges;
-                                        logger.info("Keyspace took too much 
time to repair hence skipping it {}",
-                                                    keyspaceName);
-                                        break;
+                                        
repairState.waitForRepairToComplete(config.getRepairSessionTimeout(repairType));
                                     }
-                                }
-                                else
-                                {
-                                    if 
(AutoRepairUtils.tableMaxRepairTimeExceeded(repairType, tableStartTime))
+                                    catch (InterruptedException e)
                                     {
-                                        skippedTokenRanges += totalSubRanges - 
totalProcessedSubRanges;
-                                        logger.info("Table took too much time 
to repair hence skipping it {}.{}",
-                                                    keyspaceName, tableName);
-                                        break;
+                                        logger.error("Exception in cond 
await:", e);
                                     }
-                                }
-                                Token childStartToken = token.left;
-                                Token childEndToken = token.right;
-                                logger.debug("Current Token Left side {}, 
right side {}", childStartToken
-                                                                               
           .toString(), childEndToken.toString());
-
-                                ranges.add(new Range<>(childStartToken, 
childEndToken));
-                                totalProcessedSubRanges++;
-                                if ((totalProcessedSubRanges % 
config.getRepairThreads(repairType) == 0) ||
-                                    (totalProcessedSubRanges == 
totalSubRanges))
-                                {
-                                    int retryCount = 0;
-                                    Future<?> f = null;
-                                    while (retryCount <= 
config.getRepairMaxRetries())
+                                    if (repairState.isSuccess())
                                     {
-                                        RepairRunnable task = 
repairState.getRepairRunnable(keyspaceName,
-                                                                               
             config.getRepairByKeyspace(repairType) ? tablesToBeRepaired : 
ImmutableList.of(tableName),
-                                                                               
             ranges, primaryRangeOnly);
-                                        repairState.resetWaitCondition();
-                                        f = 
repairRunnableExecutors.get(repairType).submit(task);
-                                        try
-                                        {
-                                            
repairState.waitForRepairToComplete(config.getRepairSessionTimeout(repairType));
-                                        }
-                                        catch (InterruptedException e)
-                                        {
-                                            logger.error("Exception in cond 
await:", e);
-                                        }
-                                        if (repairState.isSuccess())
-                                        {
-                                            break;
-                                        }
-                                        else if (retryCount < 
config.getRepairMaxRetries())
-                                        {
-                                            boolean cancellationStatus = 
f.cancel(true);
-                                            logger.warn("Repair failed for 
range {}-{} for {}.{} with cancellationStatus: {} retrying after {} seconds...",
-                                                        childStartToken, 
childEndToken,
-                                                        keyspaceName, 
config.getRepairByKeyspace(repairType) ? tablesToBeRepaired : tableName,
-                                                        cancellationStatus, 
config.getRepairRetryBackoff().toSeconds());
-                                            
sleepFunc.accept(config.getRepairRetryBackoff().toSeconds(), TimeUnit.SECONDS);
-                                        }
-                                        retryCount++;
+                                        break;
                                     }
-                                    //check repair status
-                                    if (repairState.isSuccess())
+                                    else if (retryCount < 
config.getRepairMaxRetries())
                                     {
-                                        logger.info("Repair completed for 
range {}-{} for {}.{}, total subranges: {}," +
-                                                    "processed subranges: {}", 
childStartToken, childEndToken,
-                                                    keyspaceName, 
config.getRepairByKeyspace(repairType) ? tablesToBeRepaired : tableName, 
totalSubRanges, totalProcessedSubRanges);
-                                        succeededTokenRanges += ranges.size();
+                                        boolean cancellationStatus = 
f.cancel(true);
+                                        logger.warn("Repair failed for range 
{}-{} for {} tables {} with cancellationStatus: {} retrying after {} 
seconds...",
+                                                    tokenRange.left, 
tokenRange.right,
+                                                    keyspaceName, 
repairAssignment.getTableNames(),
+                                                    cancellationStatus, 
config.getRepairRetryBackoff().toSeconds());
+                                        
sleepFunc.accept(config.getRepairRetryBackoff().toSeconds(), TimeUnit.SECONDS);
                                     }
-                                    else
+                                    retryCount++;
+                                }
+                                //check repair status
+                                if (repairState.isSuccess())
+                                {
+                                    logger.info("Repair completed for range 
{}-{} for {} tables {}, total assignments: {}," +
+                                                "processed assignments: {}", 
tokenRange.left, tokenRange.right,
+                                                keyspaceName, 
repairAssignment.getTableNames(), totalRepairAssignments, 
totalProcessedAssignments);
+                                    succeededTokenRanges += ranges.size();
+                                }
+                                else
+                                {
+                                    boolean cancellationStatus = true;
+                                    if (f != null)
                                     {
-                                        boolean cancellationStatus = true;
-                                        if (f != null)
-                                        {
-                                            cancellationStatus = 
f.cancel(true);
-                                        }
-                                        //in the future we can add retry, etc.
-                                        logger.error("Repair failed for range 
{}-{} for {}.{} after {} retries, total subranges: {}," +
-                                                     "processed subranges: {}, 
cancellationStatus: {}", childStartToken.toString(), childEndToken.toString(), 
keyspaceName,
-                                                     
config.getRepairByKeyspace(repairType) ? tablesToBeRepaired : tableName, 
retryCount, totalSubRanges, totalProcessedSubRanges, cancellationStatus);
-                                        failedTokenRanges += ranges.size();
+                                        cancellationStatus = f.cancel(true);
                                     }
-                                    ranges.clear();
+                                    //in the future we can add retry, etc.
+                                    logger.error("Repair failed for range 
{}-{} for {} tables {} after {} retries, total assignments: {}," +
+                                                 "processed assignments: {}, 
cancellationStatus: {}", tokenRange.left, tokenRange.right, keyspaceName,
+                                                 
repairAssignment.getTableNames(), retryCount, totalRepairAssignments, 
totalProcessedAssignments, cancellationStatus);
+                                    failedTokenRanges += ranges.size();
                                 }
+                                ranges.clear();
                             }
-                            if (config.getRepairByKeyspace(repairType))
-                            {
-                                logger.info("Repair completed for keyspace {}, 
tables: {}", keyspaceName, tablesToBeRepaired);
-                                break;
-                            }
-                            else
-                            {
-                                logger.info("Repair completed for {}.{}", 
keyspaceName, tableName);
-                            }
+                            // TODO: this format used a bit, maybe use 
RepairAssignment.toString instead
+                            logger.info("Repair completed for {} tables {}, 
range {}", keyspaceName, repairAssignment.getTableNames(), 
repairAssignment.getTokenRange());
                         }
                         catch (Exception e)
                         {
@@ -398,6 +394,14 @@ public class AutoRepair
             
repairState.setTotalTablesConsideredForRepair(repairState.getTotalTablesConsideredForRepair()
 + 1);
             TableMetadata tableMetadata = iter.next();
             String tableName = tableMetadata.name;
+
+            ColumnFamilyStore columnFamilyStore = 
keyspace.getColumnFamilyStore(tableName);
+            if 
(!columnFamilyStore.metadata().params.automatedRepair.get(repairType).repairEnabled())
+            {
+                logger.info("Repair is disabled for keyspace {} for tables: 
{}", keyspace.getName(), tableName);
+                
repairState.setTotalDisabledTablesRepairCount(repairState.getTotalDisabledTablesRepairCount()
 + 1);
+                continue;
+            }
             tablesToBeRepaired.add(tableName);
 
             // See if we should repair MVs as well that are associated with 
this given table
diff --git 
a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java 
b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java
index 97868a594f..7accc9ecd7 100644
--- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java
+++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairConfig.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair.autorepair;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -32,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
 public class AutoRepairConfig implements Serializable
@@ -59,7 +61,8 @@ public class AutoRepairConfig implements Serializable
     public enum RepairType
     {
         full,
-        incremental;
+        incremental,
+        preview_repaired;
 
         public static AutoRepairState getAutoRepairState(RepairType repairType)
         {
@@ -69,6 +72,8 @@ public class AutoRepairConfig implements Serializable
                     return new FullRepairState();
                 case incremental:
                     return new IncrementalRepairState();
+                case preview_repaired:
+                    return new PreviewRepairedState();
             }
 
             throw new IllegalArgumentException("Invalid repair type: " + 
repairType);
@@ -280,7 +285,7 @@ public class AutoRepairConfig implements Serializable
         return applyOverrides(repairType, opt -> opt.force_repair_new_node);
     }
 
-    public String getTokenRangeSplitter(RepairType repairType)
+    public ParameterizedClass getTokenRangeSplitter(RepairType repairType)
     {
         return applyOverrides(repairType, opt -> opt.token_range_splitter);
     }
@@ -340,7 +345,7 @@ public class AutoRepairConfig implements Serializable
             opts.force_repair_new_node = false;
             opts.table_max_repair_time = new 
DurationSpec.IntSecondsBound("6h");
             opts.mv_repair_enabled = false;
-            opts.token_range_splitter = 
DefaultAutoRepairTokenSplitter.class.getName();
+            opts.token_range_splitter = new 
ParameterizedClass(DefaultAutoRepairTokenSplitter.class.getName(), 
Collections.emptyMap());
             opts.initial_scheduler_delay = new 
DurationSpec.IntSecondsBound("15m"); // 15 minutes
             opts.repair_session_timeout = new 
DurationSpec.IntSecondsBound("3h"); // 3 hours
 
@@ -404,9 +409,9 @@ public class AutoRepairConfig implements Serializable
         // the default is 'true'.
         // This flag determines whether the auto-repair framework needs to run 
anti-entropy, a.k.a, repair on the MV table or not.
         public volatile Boolean mv_repair_enabled;
-        // the default is DefaultAutoRepairTokenSplitter.class.getName(). The 
class should implement IAutoRepairTokenRangeSplitter.
+        // the default is DefaultAutoRepairTokenSplitter. The class should 
implement IAutoRepairTokenRangeSplitter.
         // The default implementation splits the tokens based on the token 
ranges owned by this node divided by the number of 'number_of_subranges'
-        public volatile String token_range_splitter;
+        public volatile ParameterizedClass token_range_splitter;
         // the minimum delay after a node starts before the scheduler starts 
running repair
         public volatile DurationSpec.IntSecondsBound initial_scheduler_delay;
         // repair session timeout - this is applicable for each repair session
diff --git 
a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java 
b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java
index a4869c7742..a9f9113750 100644
--- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java
+++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java
@@ -285,6 +285,27 @@ public abstract class AutoRepairState implements 
ProgressListener
     }
 }
 
+class PreviewRepairedState extends AutoRepairState
+{
+    public PreviewRepairedState()
+    {
+        super(RepairType.preview_repaired);
+    }
+
+
+    @Override
+    public RepairRunnable getRepairRunnable(String keyspace, List<String> 
tables, Set<Range<Token>> ranges, boolean primaryRangeOnly)
+    {
+        RepairOption option = new RepairOption(RepairParallelism.PARALLEL, 
primaryRangeOnly, false, false,
+                                               
AutoRepairService.instance.getAutoRepairConfig().getRepairThreads(repairType), 
ranges,
+                                               !ranges.isEmpty(), false, 
false, PreviewKind.REPAIRED, false, true, false, false);
+
+        option.getColumnFamilies().addAll(tables);
+
+        return getRepairRunnable(keyspace, option);
+    }
+}
+
 class IncrementalRepairState extends AutoRepairState
 {
     public IncrementalRepairState()
diff --git 
a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java 
b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java
index 9ef64c9bac..636a002507 100644
--- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java
+++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairUtils.java
@@ -32,6 +32,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.LocalStrategy;
 
 import org.slf4j.Logger;
@@ -832,4 +834,33 @@ public class AutoRepairUtils
         }
         return allMvs;
     }
+
+    // TODO: test
+    public static List<Range<Token>> splitEvenly(Range<Token> tokenRange, int 
numberOfSplits)
+    {
+        List<Range<Token>> splitRanges = new ArrayList<>();
+        long left = (Long) tokenRange.left.getTokenValue();
+        long right = (Long) tokenRange.right.getTokenValue();
+        long repairTokenWidth = (right - left) / numberOfSplits;
+        for (int i = 0; i < numberOfSplits; i++)
+        {
+            long curLeft = left + (i * repairTokenWidth);
+            long curRight = curLeft + repairTokenWidth;
+
+            if ((i + 1) == numberOfSplits)
+            {
+                curRight = right;
+            }
+
+            Token childStartToken = StorageService.instance.getTokenMetadata()
+                                    
.partitioner.getTokenFactory().fromString("" + curLeft);
+            Token childEndToken = StorageService.instance.getTokenMetadata()
+                                  .partitioner.getTokenFactory().fromString("" 
+ curRight);
+            logger.debug("Current Token Left side {}, right side {}", 
childStartToken
+                                                                      
.toString(), childEndToken.toString());
+            Range<Token> splitRange = new Range<>(childStartToken, 
childEndToken);
+            splitRanges.add(splitRange);
+        }
+        return splitRanges;
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/repair/autorepair/DefaultAutoRepairTokenSplitter.java
 
b/src/java/org/apache/cassandra/repair/autorepair/DefaultAutoRepairTokenSplitter.java
index 7c254fd358..a58aede3af 100644
--- 
a/src/java/org/apache/cassandra/repair/autorepair/DefaultAutoRepairTokenSplitter.java
+++ 
b/src/java/org/apache/cassandra/repair/autorepair/DefaultAutoRepairTokenSplitter.java
@@ -20,7 +20,10 @@ package org.apache.cassandra.repair.autorepair;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.cassandra.service.AutoRepairService;
 
@@ -33,15 +36,18 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 
+import static 
org.apache.cassandra.repair.autorepair.AutoRepairUtils.splitEvenly;
+
 public class DefaultAutoRepairTokenSplitter implements 
IAutoRepairTokenRangeSplitter
 {
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultAutoRepairTokenSplitter.class);
 
 
     @Override
-    public List<Pair<Token, Token>> getRange(AutoRepairConfig.RepairType 
repairType, boolean primaryRangeOnly, String keyspaceName, String tableName)
+    public List<RepairAssignment> 
getRepairAssignments(AutoRepairConfig.RepairType repairType, boolean 
primaryRangeOnly, String keyspaceName, Set<String> tableNames)
     {
-        List<Pair<Token, Token>> range = new ArrayList<>();
+        AutoRepairConfig config = 
AutoRepairService.instance.getAutoRepairConfig();
+        List<RepairAssignment> repairAssignments = new ArrayList<>();
 
         Collection<Range<Token>> tokens = 
StorageService.instance.getPrimaryRanges(keyspaceName);
         if (!primaryRangeOnly)
@@ -49,41 +55,36 @@ public class DefaultAutoRepairTokenSplitter implements 
IAutoRepairTokenRangeSpli
             // if we need to repair non-primary token ranges, then change the 
tokens accrodingly
             tokens = 
StorageService.instance.getLocalReplicas(keyspaceName).ranges();
         }
-        AutoRepairConfig config = 
AutoRepairService.instance.getAutoRepairConfig();
         int numberOfSubranges = config.getRepairSubRangeNum(repairType);
+
+        boolean byKeyspace = config.getRepairByKeyspace(repairType);
+
+        // collect all token ranges.
+        List<Range<Token>> allRanges = new ArrayList<>();
         for (Range<Token> token : tokens)
         {
-            Murmur3Partitioner.LongToken l = (Murmur3Partitioner.LongToken) 
(token.left);
-            Murmur3Partitioner.LongToken r = (Murmur3Partitioner.LongToken) 
(token.right);
-            Token parentStartToken = StorageService.instance.getTokenMetadata()
-                                     
.partitioner.getTokenFactory().fromString("" + l.getTokenValue());
-            Token parentEndToken = StorageService.instance.getTokenMetadata()
-                                   
.partitioner.getTokenFactory().fromString("" + r.getTokenValue());
-            logger.debug("Parent Token Left side {}, right side {}", 
parentStartToken.toString(),
-                         parentEndToken.toString());
+            allRanges.addAll(splitEvenly(token, numberOfSubranges));
+        }
 
-            long left = (Long) l.getTokenValue();
-            long right = (Long) r.getTokenValue();
-            long repairTokenWidth = (right - left) / numberOfSubranges;
-            for (int i = 0; i < numberOfSubranges; i++)
+        if (byKeyspace)
+        {
+            for (Range<Token> splitRange : allRanges)
             {
-                long curLeft = left + (i * repairTokenWidth);
-                long curRight = curLeft + repairTokenWidth;
-
-                if ((i + 1) == numberOfSubranges)
+                // add repair assignment for each range entire keyspace's 
tables
+                repairAssignments.add(new RepairAssignment(splitRange, 
keyspaceName, tableNames));
+            }
+        }
+        else
+        {
+            // add repair assignment per table
+            for (String tableName : tableNames)
+            {
+                for (Range<Token> splitRange : allRanges)
                 {
-                    curRight = right;
+                    repairAssignments.add(new RepairAssignment(splitRange, 
keyspaceName, Collections.singleton(tableName)));
                 }
-
-                Token childStartToken = 
StorageService.instance.getTokenMetadata()
-                                        
.partitioner.getTokenFactory().fromString("" + curLeft);
-                Token childEndToken = 
StorageService.instance.getTokenMetadata()
-                                      
.partitioner.getTokenFactory().fromString("" + curRight);
-                logger.debug("Current Token Left side {}, right side {}", 
childStartToken
-                                                                          
.toString(), childEndToken.toString());
-                range.add(Pair.create(childStartToken, childEndToken));
             }
         }
-        return range;
+        return repairAssignments;
     }
 }
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/repair/autorepair/IAutoRepairTokenRangeSplitter.java
 
b/src/java/org/apache/cassandra/repair/autorepair/IAutoRepairTokenRangeSplitter.java
index 2b69898a36..fd63769431 100644
--- 
a/src/java/org/apache/cassandra/repair/autorepair/IAutoRepairTokenRangeSplitter.java
+++ 
b/src/java/org/apache/cassandra/repair/autorepair/IAutoRepairTokenRangeSplitter.java
@@ -19,13 +19,83 @@ package org.apache.cassandra.repair.autorepair;
 
 
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.Pair;
 
 public interface IAutoRepairTokenRangeSplitter
 {
-    // split the token range you wish to repair into multiple subranges
-    // the autorepair framework will repair the list of returned subrange in a 
sequence
-    List<Pair<Token, Token>> getRange(AutoRepairConfig.RepairType repairType, 
boolean primaryRangeOnly, String keyspaceName, String tableName);
+
+    /**
+     * Split the token range you wish to repair into multiple assignments.
+     * The autorepair framework will repair the list of returned subrange in a 
sequence.
+     * @param repairType The type of repair being executed
+     * @param primaryRangeOnly Whether to repair only this node's primary 
ranges or all of its ranges.
+     * @param keyspaceName The keyspace being repaired
+     * @param tableNames The tables to repair
+     * @return repair assignments broken up by range, keyspace and tables.
+     */
+    List<RepairAssignment> getRepairAssignments(AutoRepairConfig.RepairType 
repairType, boolean primaryRangeOnly, String keyspaceName, Set<String> 
tableNames);
+
+    /**
+     * Defines a repair assignment to be issued by the autorepair framework.
+     */
+    class RepairAssignment
+    {
+        final Range<Token> tokenRange;
+
+        final String keyspaceName;
+
+        final Set<String> tableNames;
+
+        public RepairAssignment(Range<Token> tokenRange, String keyspaceName, 
Set<String> tableNames)
+        {
+            this.tokenRange = tokenRange;
+            this.keyspaceName = keyspaceName;
+            this.tableNames = tableNames;
+        }
+
+        public Range<Token> getTokenRange()
+        {
+            return tokenRange;
+        }
+
+        public String getKeyspaceName()
+        {
+            return keyspaceName;
+        }
+
+        public Set<String> getTableNames()
+        {
+            return tableNames;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            RepairAssignment that = (RepairAssignment) o;
+            return Objects.equals(tokenRange, that.tokenRange) && 
Objects.equals(keyspaceName, that.keyspaceName) && Objects.equals(tableNames, 
that.tableNames);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(tokenRange, keyspaceName, tableNames);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "RepairAssignment{" +
+                   "tokenRange=" + tokenRange +
+                   ", keyspaceName='" + keyspaceName + '\'' +
+                   ", tableNames=" + tableNames +
+                   '}';
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/repair/autorepair/UnrepairedBytesBasedTokenRangeSplitter.java
 
b/src/java/org/apache/cassandra/repair/autorepair/UnrepairedBytesBasedTokenRangeSplitter.java
new file mode 100644
index 0000000000..1e65d323d9
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/repair/autorepair/UnrepairedBytesBasedTokenRangeSplitter.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.autorepair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.big.BigTableScanner;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+import static 
org.apache.cassandra.repair.autorepair.AutoRepairUtils.splitEvenly;
+
+public class UnrepairedBytesBasedTokenRangeSplitter implements 
IAutoRepairTokenRangeSplitter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(UnrepairedBytesBasedTokenRangeSplitter.class);
+
+    static final String SUBRANGE_SIZE = "subrange_size";
+    static final String MAX_BYTES_PER_SCHEDULE = "max_bytes_per_schedule";
+
+    // target bytes per subrange
+    private final DataStorageSpec.LongBytesBound subrangeSize;
+
+    // maximum target bytes to repair
+    private final DataStorageSpec.LongBytesBound maxBytesPerSchedule;
+
+    private final long subrangeBytes;
+
+    private final long maxBytesPerScheduleBytes;
+
+    private static final DataStorageSpec.LongBytesBound DEFAULT_SUBRANGE_SIZE 
= new DataStorageSpec.LongBytesBound("100GiB");
+    private static final DataStorageSpec.LongBytesBound 
DEFAULT_MAX_BYTES_PER_SCHEDULE = new DataStorageSpec.LongBytesBound("500GiB");
+
+    public UnrepairedBytesBasedTokenRangeSplitter(Map<String, String> 
parameters)
+    {
+        // Demonstrates parameterizing a range splitter so we can have 
splitter specific options.
+        if (parameters.containsKey(SUBRANGE_SIZE))
+        {
+            subrangeSize = new 
DataStorageSpec.LongBytesBound(parameters.get(SUBRANGE_SIZE));
+        }
+        else
+        {
+            subrangeSize = DEFAULT_SUBRANGE_SIZE;
+        }
+        subrangeBytes = subrangeSize.toBytes();
+
+        if (parameters.containsKey(MAX_BYTES_PER_SCHEDULE))
+        {
+            maxBytesPerSchedule = new 
DataStorageSpec.LongBytesBound(parameters.get(MAX_BYTES_PER_SCHEDULE));
+        }
+        else
+        {
+            maxBytesPerSchedule = DEFAULT_MAX_BYTES_PER_SCHEDULE;
+        }
+        maxBytesPerScheduleBytes = maxBytesPerSchedule.toBytes();
+
+        logger.info("Configured {} with {}={}, {}={}", 
UnrepairedBytesBasedTokenRangeSplitter.class.getName(),
+                    SUBRANGE_SIZE, subrangeSize, MAX_BYTES_PER_SCHEDULE, 
maxBytesPerSchedule);
+    }
+
+    @Override
+    public List<RepairAssignment> 
getRepairAssignments(AutoRepairConfig.RepairType repairType, boolean 
primaryRangeOnly, String keyspaceName, Set<String> tableNames)
+    {
+        List<RepairAssignment> repairAssignments = new ArrayList<>();
+
+        logger.info("Calculating token range splits for repairType={} 
primaryRangeOnly={} keyspaceName={} tableNames={}", repairType, 
primaryRangeOnly, keyspaceName, tableNames);
+        if (repairType != AutoRepairConfig.RepairType.incremental)
+        {
+            throw new IllegalArgumentException(this.getClass().getName() + " 
only supports " + AutoRepairConfig.RepairType.incremental + " repair");
+        }
+
+        // TODO: create a custom repair assignment that indicates number of 
bytes in repair and join tables by byte size.
+        Collection<Range<Token>> tokenRanges = 
getTokenRanges(primaryRangeOnly, keyspaceName);
+        for (String tableName : tableNames)
+        {
+            
repairAssignments.addAll(getRepairAssignmentsForTable(keyspaceName, tableName, 
tokenRanges));
+        }
+        return repairAssignments;
+    }
+
+    public List<RepairAssignment> getRepairAssignmentsForTable(String 
keyspaceName, String tableName, Collection<Range<Token>> tokenRanges)
+    {
+        List<RepairAssignment> repairAssignments = new ArrayList<>();
+
+        long targetBytesSoFar = 0;
+
+        for (Range<Token> tokenRange : tokenRanges)
+        {
+            logger.info("Calculating unrepaired bytes for {}.{} for range {}", 
keyspaceName, tableName, tokenRange);
+            // Capture the amount of unrepaired bytes for range
+            long approximateUnrepairedBytesForRange = 0L;
+            // Capture the total bytes in read sstables, this will be useful 
for calculating the ratio
+            // of data in SSTables including this range and also useful to 
know how much anticompaction there will be.
+            long totalBytesInUnrepairedSSTables = 0L;
+            try (Refs<SSTableReader> refs = getSSTableReaderRefs(keyspaceName, 
tableName, tokenRange))
+            {
+                for (SSTableReader reader : refs)
+                {
+                    // Only evaluate unrepaired SSTables.
+                    if (!reader.isRepaired())
+                    {
+                        long sstableSize = reader.bytesOnDisk();
+                        totalBytesInUnrepairedSSTables += sstableSize;
+                        // get the bounds of the sstable for this range using 
the index file but do not actually read it.
+                        List<AbstractBounds<PartitionPosition>> bounds = 
BigTableScanner.makeBounds(reader, Collections.singleton(tokenRange));
+                        try (BigTableScanner scanner = (BigTableScanner) 
BigTableScanner.getScanner(reader, Collections.singleton(tokenRange)))
+                        {
+                            assert bounds.size() == 1;
+
+                            AbstractBounds<PartitionPosition> bound = 
bounds.get(0);
+                            long startPosition = 
scanner.getDataPosition(bound.left);
+                            long endPosition = 
scanner.getDataPosition(bound.right);
+                            // If end position is 0 we can assume the sstable 
ended before that token, bound at size of file
+                            if (endPosition == 0)
+                            {
+                                endPosition = sstableSize;
+                            }
+
+                            long approximateRangeBytesInSSTable = Math.max(0, 
endPosition - startPosition);
+                            // get the fraction of the sstable belonging to 
the range.
+                            approximateUnrepairedBytesForRange += 
Math.min(approximateRangeBytesInSSTable, sstableSize);
+                            double ratio = approximateRangeBytesInSSTable / 
(double) sstableSize;
+                            logger.info("Calculations for {}.{} {}: 
sstableSize={}, rangeBytesInSSTable={}, startPosition={}, endPosition={}, 
ratio={}",
+                                        keyspaceName, tableName, 
reader.descriptor.baseFilename(),
+                                        
FileUtils.stringifyFileSize(sstableSize), 
FileUtils.stringifyFileSize(approximateRangeBytesInSSTable), startPosition, 
endPosition, ratio);
+                        }
+                    }
+                    else
+                    {
+                        logger.info("Skipping over {}.{} {} ({}) because it is 
repaired", keyspaceName, tableName, reader.descriptor.baseFilename(), 
FileUtils.stringifyFileSize(reader.bytesOnDisk()));
+                    }
+                }
+            }
+
+            // Only consider token range if it had unrepaired sstables or live 
data in memtables.
+            if (totalBytesInUnrepairedSSTables > 0L)
+            {
+                // TODO: Possibly some anticompaction configuration we want 
here, where if we detect a large amount of anticompaction we want to reduce the 
work we do.
+                double ratio = approximateUnrepairedBytesForRange / (double) 
totalBytesInUnrepairedSSTables;
+                logger.info("Calculated unrepaired bytes for {}.{} for range 
{}: sstableSize={}, rangeBytesInSSTables={}, ratio={}", keyspaceName, 
tableName, tokenRange,
+                            
FileUtils.stringifyFileSize(totalBytesInUnrepairedSSTables), 
FileUtils.stringifyFileSize(approximateUnrepairedBytesForRange), ratio);
+
+                // TODO: split on byte size here, this is currently a bit 
naive in assuming that data is evenly distributed among the range which may not 
be the
+                // right assumption.  May want to consider when splitting on 
these ranges to reevaluate how much data is in the range, but for this
+                // exists as a demonstration.
+                if (approximateUnrepairedBytesForRange < subrangeBytes)
+                {
+                    // accept range as is if less than bytes.
+                    logger.info("Using 1 repair assignment for {}.{} for range 
{} as {} is less than {}", keyspaceName, tableName, tokenRange,
+                                
FileUtils.stringifyFileSize(approximateUnrepairedBytesForRange), subrangeSize);
+                    // TODO: this is a bit repetitive see if can reduce more.
+                    RepairAssignment assignment = new 
BytesBasedRepairAssignment(tokenRange, keyspaceName, 
Collections.singleton(tableName), approximateUnrepairedBytesForRange);
+                    if (!canAddAssignment(assignment, targetBytesSoFar, 
approximateUnrepairedBytesForRange))
+                    {
+                        return repairAssignments;
+                    }
+                    repairAssignments.add(assignment);
+                    targetBytesSoFar += approximateUnrepairedBytesForRange;
+                }
+                else
+                {
+                    long targetRanges = approximateUnrepairedBytesForRange / 
subrangeBytes;
+                    // TODO: approximation per range, this is a bit lossy 
since targetRanges rounds down.
+                    long approximateBytesPerSplit = 
approximateUnrepairedBytesForRange / targetRanges;
+                    logger.info("Splitting {}.{} for range {} into {} sub 
ranges, approximateBytesPerSplit={}", keyspaceName, tableName, tokenRange, 
targetRanges, approximateBytesPerSplit);
+                    List<Range<Token>> splitRanges = splitEvenly(tokenRange, 
(int) targetRanges);
+                    for (Range<Token> splitRange : splitRanges)
+                    {
+                        RepairAssignment assignment = new 
BytesBasedRepairAssignment(splitRange, keyspaceName, 
Collections.singleton(tableName), approximateBytesPerSplit);
+                        if (!canAddAssignment(assignment, targetBytesSoFar, 
approximateBytesPerSplit))
+                        {
+                            return repairAssignments;
+                        }
+                        repairAssignments.add(assignment);
+                        targetBytesSoFar += approximateBytesPerSplit;
+                    }
+                }
+            }
+            else
+            {
+                ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(keyspaceName, tableName);
+                long memtableSize = 
cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize();
+                if (memtableSize > 0L)
+                {
+                    logger.info("Included {}.{} range {}, had no unrepaired 
SSTables, but memtableSize={}, adding single repair assignment", keyspaceName, 
tableName, tokenRange, memtableSize);
+                    RepairAssignment assignment = new 
BytesBasedRepairAssignment(tokenRange, keyspaceName, 
Collections.singleton(tableName), memtableSize);
+                    if (targetBytesSoFar >= maxBytesPerScheduleBytes)
+                    {
+                        return repairAssignments;
+                    }
+                    repairAssignments.add(assignment);
+                    targetBytesSoFar += memtableSize;
+                }
+                else
+                {
+                    logger.info("Skipping {}.{} for range {} because it had no 
unrepaired SSTables and no memtable data", keyspaceName, tableName, tokenRange);
+                }
+            }
+        }
+        return repairAssignments;
+    }
+
+    private boolean canAddAssignment(RepairAssignment repairAssignment, long 
targetBytesSoFar, long bytesToBeAdded)
+    {
+        if (targetBytesSoFar + bytesToBeAdded < maxBytesPerScheduleBytes)
+        {
+            return true;
+        }
+        logger.warn("Refusing to add {} with a target size of {} because it 
would increase total repair bytes to {} which is greater than {}={}",
+                    repairAssignment, 
FileUtils.stringifyFileSize(bytesToBeAdded), 
FileUtils.stringifyFileSize(targetBytesSoFar + bytesToBeAdded), 
MAX_BYTES_PER_SCHEDULE, maxBytesPerSchedule);
+        return false;
+    }
+
+    public Collection<Range<Token>> getTokenRanges(boolean primaryRangeOnly, 
String keyspaceName)
+    {
+        // Collect all applicable token ranges
+        Collection<Range<Token>> wrappedRanges;
+        if (primaryRangeOnly)
+        {
+            wrappedRanges = 
StorageService.instance.getPrimaryRanges(keyspaceName);
+        }
+        else
+        {
+            wrappedRanges = 
StorageService.instance.getLocalRanges(keyspaceName);
+        }
+
+        // Unwrap each range as we need to account for ranges that overlap the 
ring
+        Collection<Range<Token>> ranges = new ArrayList<>();
+        for (Range<Token> wrappedRange : wrappedRanges)
+        {
+            ranges.addAll(wrappedRange.unwrap());
+        }
+
+        return ranges;
+    }
+
+    public Refs<SSTableReader> getSSTableReaderRefs(String keyspaceName, 
String tableName, Range<Token> tokenRange)
+    {
+        final ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(keyspaceName, tableName);
+
+        if (cfs == null)
+        {
+            throw new IllegalArgumentException(String.format("Could not 
resolve ColumnFamilyStore from %s.%s", keyspaceName, tableName));
+        }
+
+        Iterable<SSTableReader> sstables = 
cfs.getTracker().getView().select(SSTableSet.CANONICAL);
+        SSTableIntervalTree tree = SSTableIntervalTree.build(sstables);
+        Range<PartitionPosition> r = Range.makeRowRange(tokenRange);
+        Iterable<SSTableReader> canonicalSSTables = 
View.sstablesInBounds(r.left, r.right, tree);
+
+        // TODO: may need to reason about this not working.
+        return Refs.ref(canonicalSSTables);
+    }
+
+    public static class BytesBasedRepairAssignment extends RepairAssignment
+    {
+        private long approximateBytes;
+
+        public BytesBasedRepairAssignment(Range<Token> tokenRange, String 
keyspaceName, Set<String> tableNames, long approximateBytes)
+        {
+            super(tokenRange, keyspaceName, tableNames);
+            this.approximateBytes = approximateBytes;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            if (!super.equals(o)) return false;
+            BytesBasedRepairAssignment that = (BytesBasedRepairAssignment) o;
+            return approximateBytes == that.approximateBytes;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(super.hashCode(), approximateBytes);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "BytesBasedRepairAssignment{" +
+                   "keyspaceName='" + keyspaceName + '\'' +
+                   ", approximateBytes=" + approximateBytes +
+                   ", tokenRange=" + tokenRange +
+                   ", tableNames=" + tableNames +
+                   '}';
+        }
+
+        public long getApproximateBytes()
+        {
+            return approximateBytes;
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index d63bbace79..f6eab7364c 100644
--- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@ -406,4 +406,4 @@ public final class SystemDistributedKeyspace
     {
         UNKNOWN, STARTED, SUCCESS
     }
-}
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index d3764778da..c9b8233c6f 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -50,6 +50,7 @@ import com.google.common.base.Suppliers;
 
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileInputStreamPlus;
 import org.apache.cassandra.io.util.FileOutputStreamPlus;
@@ -686,11 +687,31 @@ public class FBUtilities
         }
     }
 
-    public static IAutoRepairTokenRangeSplitter 
newAutoRepairTokenRangeSplitter(String className) throws ConfigurationException
+    public static IAutoRepairTokenRangeSplitter 
newAutoRepairTokenRangeSplitter(ParameterizedClass parameterizedClass) throws 
ConfigurationException
     {
-        if (!className.contains("."))
-            className = "org.apache.cassandra.repair.autorepair." + className;
-        return FBUtilities.construct(className, "auto repair token splitter");
+        String className = parameterizedClass.class_name.contains(".") ?
+                           parameterizedClass.class_name :
+                           "org.apache.cassandra.repair.autorepair." + 
parameterizedClass.class_name;
+
+        try
+        {
+            Class<?> tokenRangeSplitterClass = Class.forName(className);
+            try
+            {
+                Map<String, String> parameters = parameterizedClass.parameters 
!= null ? parameterizedClass.parameters : Collections.emptyMap();
+                // first attempt to initialize with Map arguments.
+                return (IAutoRepairTokenRangeSplitter) 
tokenRangeSplitterClass.getConstructor(Map.class).newInstance(parameters);
+            }
+            catch (NoSuchMethodException nsme)
+            {
+                // fall back on no argument constructor.
+                return (IAutoRepairTokenRangeSplitter)  
tokenRangeSplitterClass.getConstructor().newInstance();
+            }
+        }
+        catch (Exception ex)
+        {
+            throw new ConfigurationException("Unable to create instance of 
IAutoRepairTokenRangeSplitter for " + className, ex);
+        }
     }
 
     /**
diff --git 
a/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairConfigTest.java 
b/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairConfigTest.java
index 07eb9070fe..1cff3569a8 100644
--- a/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairConfigTest.java
+++ b/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairConfigTest.java
@@ -20,6 +20,10 @@ package org.apache.cassandra.repair.autorepair;
 
 import java.util.EnumMap;
 import java.util.Objects;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
@@ -30,7 +34,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.repair.autorepair.AutoRepairConfig.Options;
@@ -440,14 +446,24 @@ public class AutoRepairConfigTest extends CQLTester
     {
         Options defaultOptions = Options.getDefaultOptions();
 
-        
assertEquals(DefaultAutoRepairTokenSplitter.class.getName(),defaultOptions.token_range_splitter);
+        ParameterizedClass expectedDefault = new 
ParameterizedClass(DefaultAutoRepairTokenSplitter.class.getName(), 
Collections.emptyMap());
+
+        assertEquals(expectedDefault, defaultOptions.token_range_splitter);
         assertEquals(DefaultAutoRepairTokenSplitter.class.getName(), 
FBUtilities.newAutoRepairTokenRangeSplitter(defaultOptions.token_range_splitter).getClass().getName());
     }
 
+    @Test
+    public void testTokenRangeSplitterAcceptingMap()
+    {
+        Map<String, String> splitterOptions = new HashMap<>();
+        splitterOptions.put("unrepaired_bytes_per_split", "1MiB");
+        config.global_settings.token_range_splitter = new 
ParameterizedClass(UnrepairedBytesBasedTokenRangeSplitter.class.getName(), 
splitterOptions);
+    }
+
     @Test(expected = ConfigurationException.class)
     public void testInvalidTokenRangeSplitter()
     {
-        assertEquals(DefaultAutoRepairTokenSplitter.class.getName(), 
FBUtilities.newAutoRepairTokenRangeSplitter("invalid-class").getClass().getName());
+        FBUtilities.newAutoRepairTokenRangeSplitter(new 
ParameterizedClass("invalid-class", Collections.emptyMap()));
     }
 
     @Test
diff --git 
a/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairParameterizedTest.java
 
b/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairParameterizedTest.java
index 1604055e4b..96b92e10ed 100644
--- 
a/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairParameterizedTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/autorepair/AutoRepairParameterizedTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.autorepair;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -36,6 +37,7 @@ import 
org.apache.cassandra.cql3.statements.schema.TableAttributes;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.repair.RepairRunnable;
+import 
org.apache.cassandra.repair.autorepair.IAutoRepairTokenRangeSplitter.RepairAssignment;
 import org.apache.cassandra.schema.AutoRepairParams;
 import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.StorageService;
@@ -540,13 +542,12 @@ public class AutoRepairParameterizedTest extends CQLTester
     {
         Collection<Range<Token>> tokens = 
StorageService.instance.getPrimaryRanges(KEYSPACE);
         assertEquals(1, tokens.size());
-        List<Range<Token>> expectedToken = new ArrayList<>();
-        expectedToken.addAll(tokens);
+        List<Range<Token>> expectedToken = new ArrayList<>(tokens);
 
-        List<Pair<Token, Token>> ranges = new 
DefaultAutoRepairTokenSplitter().getRange(repairType, true, KEYSPACE, TABLE);
-        assertEquals(1, ranges.size());
-        assertEquals(expectedToken.get(0).left, ranges.get(0).left);
-        assertEquals(expectedToken.get(0).right, ranges.get(0).right);
+        List<RepairAssignment> assignments = new 
DefaultAutoRepairTokenSplitter().getRepairAssignments(repairType, true, 
KEYSPACE, Collections.singleton(TABLE));
+        assertEquals(1, assignments.size());
+        assertEquals(expectedToken.get(0).left, 
assignments.get(0).getTokenRange().left);
+        assertEquals(expectedToken.get(0).right, 
assignments.get(0).getTokenRange().right);
     }
 
     @Test
@@ -554,13 +555,13 @@ public class AutoRepairParameterizedTest extends CQLTester
     {
         Collection<Range<Token>> tokens = 
StorageService.instance.getPrimaryRanges(KEYSPACE);
         assertEquals(1, tokens.size());
-        List<Range<Token>> expectedToken = new ArrayList<>();
-        expectedToken.addAll(tokens);
+        // TODO: validate the tokens
+        List<Range<Token>> expectedToken = new ArrayList<>(tokens);
 
         AutoRepairConfig config = 
AutoRepairService.instance.getAutoRepairConfig();
         config.setRepairSubRangeNum(repairType, 4);
-        List<Pair<Token, Token>> ranges = new 
DefaultAutoRepairTokenSplitter().getRange(repairType, true, KEYSPACE, TABLE);
-        assertEquals(4, ranges.size());
+        List<RepairAssignment> assignments = new 
DefaultAutoRepairTokenSplitter().getRepairAssignments(repairType, true, 
KEYSPACE, Collections.singleton(TABLE));
+        assertEquals(4, assignments.size());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to