This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 881cff46da8f4a0369e3dcd96231535da17ec193
Merge: 3db6444 0f46c90
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Thu Sep 24 21:17:44 2020 -0700

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   2 +
 .../apache/cassandra/repair/RepairRunnable.java    |  25 +++-
 .../cassandra/repair/messages/RepairOption.java    |  14 ++-
 .../apache/cassandra/tools/nodetool/Repair.java    |   4 +
 .../distributed/test/RepairOperationalTest.java    | 131 +++++++++++++++++++++
 5 files changed, 168 insertions(+), 8 deletions(-)

diff --cc CHANGES.txt
index 1db3ebc,189aec4..2e6715f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,50 -1,15 +1,52 @@@
 -3.11.9
 +4.0-beta3
   * Avoid failing compactions with very large partitions (CASSANDRA-15164)
 + * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
 + * Avoid invalid state transition exception during incremental repair 
(CASSANDRA-16067)
 + * Allow zero padding in timestamp serialization (CASSANDRA-16105)
 + * Add byte array backed cells (CASSANDRA-15393)
 + * Correctly handle pending ranges with adjacent range movements 
(CASSANDRA-14801)
 + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
 + * Add nodetool getfullquerylog (CASSANDRA-15988)
 + * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
 + * Avoid trying to keep track of RTs for endpoints we won't write to during 
read repair (CASSANDRA-16084)
 + * When compaction gets interrupted, the exception should include the 
compactionId (CASSANDRA-15954)
 + * Make Table/Keyspace Metric Names Consistent With Each Other 
(CASSANDRA-15909)
 + * Mutating sstable component may race with entire-sstable-streaming(ZCS) 
causing checksum validation failure (CASSANDRA-15861)
 + * NPE thrown while updating speculative execution time if keyspace is 
removed during task execution (CASSANDRA-15949)
 + * Show the progress of data streaming and index build (CASSANDRA-15406)
 +Merged from 3.11:
 + * Use IF NOT EXISTS for index and UDT create statements in snapshot schema 
files (CASSANDRA-13935)
   * Make sure LCS handles duplicate sstable added/removed notifications 
correctly (CASSANDRA-14103)
+ Merged from 3.0:
+  * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
  
 -3.11.8
 +4.0-beta2
 + * Add addition incremental repair visibility to nodetool repair_admin 
(CASSANDRA-14939)
 + * Always access system properties and environment variables via the new 
CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
 + * Remove deprecated HintedHandOffManager (CASSANDRA-15939)
 + * Prevent repair from overrunning compaction (CASSANDRA-15817)
 + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
 + * Strip comment blocks from cqlsh input before processing statements 
(CASSANDRA-15802)
 + * Fix unicode chars error input (CASSANDRA-15990)
 + * Improved testability for CacheMetrics and ChunkCacheMetrics 
(CASSANDRA-15788)
 + * Handle errors in StreamSession#prepare (CASSANDRA-15852)
 + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
 + * Remove COMPACT STORAGE internals (CASSANDRA-13994)
 + * Make TimestampSerializer accept fractional seconds of varying precision 
(CASSANDRA-15976)
 + * Improve cassandra-stress logging when using a profile file that doesn't 
exist (CASSANDRA-14425)
 + * Improve logging for socket connection/disconnection (CASSANDRA-15980)
 + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy 
(CASSANDRA-15928)
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 + * Resolve JMX output inconsistencies from CASSANDRA-7544 
storage-port-configurable-per-node (CASSANDRA-15937)
 +Merged from 3.11:
   * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in 
megabytes not bytes (CASSANDRA-16071)
   * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
  Merged from 3.0:
 - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema 
files (CASSANDRA-13935)
   * Fix gossip shutdown order (CASSANDRA-15816)
   * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index e5e8e50,7a9590b..f6aa6d1
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -50,18 -33,9 +50,14 @@@ import org.apache.commons.lang3.time.Du
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 +import com.codahale.metrics.Timer;
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.NamedThreadFactory;
 -import org.apache.cassandra.config.SchemaConstants;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.metrics.RepairMetrics;
- import org.apache.cassandra.db.SnapshotCommand;
 +import org.apache.cassandra.gms.FailureDetector;
- import org.apache.cassandra.net.Message;
- import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.repair.consistent.SyncStatSummary;
 +import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.cql3.QueryOptions;
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
@@@ -82,8 -49,6 +78,7 @@@ import org.apache.cassandra.service.Act
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 +import org.apache.cassandra.streaming.PreviewKind;
  import org.apache.cassandra.tracing.TraceKeyspace;
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
@@@ -307,104 -137,181 +302,122 @@@ public class RepairRunnable implements 
          String message = String.format("Starting repair command #%d (%s), 
repairing keyspace %s with %s", cmd, parentSession, keyspace,
                                         options);
          logger.info(message);
 -        if (options.isTraced())
 -        {
 -            StringBuilder cfsb = new StringBuilder();
 -            for (ColumnFamilyStore cfs : validColumnFamilies)
 -                cfsb.append(", 
").append(cfs.keyspace.getName()).append(".").append(cfs.name);
 -
 -            UUID sessionId = 
Tracing.instance.newSession(Tracing.TraceType.REPAIR);
 -            traceState = Tracing.instance.begin("repair", 
ImmutableMap.of("keyspace", keyspace, "columnFamilies",
 -                                                                          
cfsb.substring(2)));
 -            message = message + " tracing with " + sessionId;
 -            fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 
0, 100, message));
 -            Tracing.traceRepair(message);
 -            traceState.enableActivityNotification(tag);
 -            for (ProgressListener listener : listeners)
 -                traceState.addProgressListener(listener);
 -            Thread queryThread = createQueryThread(cmd, sessionId);
 -            queryThread.setName("RepairTracePolling");
 -            queryThread.start();
 -        }
 -        else
 -        {
 -            fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 
0, 100, message));
 -            traceState = null;
 -        }
 +        Tracing.traceRepair(message);
 +        fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, 
message));
 +    }
  
 -        final Set<InetAddress> allNeighbors = new HashSet<>();
 -        List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> 
commonRanges = new ArrayList<>();
 +    private NeighborsAndRanges getNeighborsAndRanges()
 +    {
 +        Set<InetAddressAndPort> allNeighbors = new HashSet<>();
 +        List<CommonRange> commonRanges = new ArrayList<>();
  
 -        //pre-calculate output of getLocalRanges and pass it to getNeighbors 
to increase performance and prevent
 +        //pre-calculate output of getLocalReplicas and pass it to 
getNeighbors to increase performance and prevent
          //calculation multiple times
 -        Collection<Range<Token>> keyspaceLocalRanges = 
storageService.getLocalRanges(keyspace);
 +        Iterable<Range<Token>> keyspaceLocalRanges = 
storageService.getLocalReplicas(keyspace).ranges();
  
 -        try
 +        for (Range<Token> range : options.getRanges())
          {
 -            for (Range<Token> range : options.getRanges())
 +            EndpointsForRange neighbors = 
ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
 +                                                                           
options.getDataCenters(),
 +                                                                           
options.getHosts());
- 
++            if (neighbors.isEmpty())
+             {
 -                Set<InetAddress> neighbors = 
ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
 -                                                                              
options.getDataCenters(),
 -                                                                              
options.getHosts());
 -
 -                if (neighbors.isEmpty())
++                if (options.ignoreUnreplicatedKeyspaces())
+                 {
 -                    if (options.ignoreUnreplicatedKeyspaces())
 -                    {
 -                        logger.info("Found no neighbors for range {} for {} - 
ignoring since repairing with --ignore-unreplicated-keyspaces", range, 
keyspace);
 -                        continue;
 -                    }
 -                    else
 -                    {
 -                        String errorMessage = String.format("Nothing to 
repair for %s in %s - aborting", range, keyspace);
 -                        logger.error("Repair {}",  errorMessage);
 -                        fireErrorAndComplete(tag, progress.get(), 
totalProgress, errorMessage);
 -                        return;
 -                    }
++                    logger.info("{} Found no neighbors for range {} for {} - 
ignoring since repairing with --ignore-unreplicated-keyspaces", parentSession, 
range, keyspace);
++                    continue;
++                }
++                else
++                {
++                    throw new RuntimeException(String.format("Nothing to 
repair for %s in %s - aborting", range, keyspace));
+                 }
 -                addRangeToNeighbors(commonRanges, range, neighbors);
 -                allNeighbors.addAll(neighbors);
+             }
 -
 -            progress.incrementAndGet();
 +            addRangeToNeighbors(commonRanges, range, neighbors);
 +            allNeighbors.addAll(neighbors.endpoints());
          }
 -        catch (IllegalArgumentException e)
 +
++        if (options.ignoreUnreplicatedKeyspaces() && allNeighbors.isEmpty())
+         {
 -            logger.error("Repair failed:", e);
 -            fireErrorAndComplete(tag, progress.get(), totalProgress, 
e.getMessage());
 -            return;
++            throw new SkipRepairException(String.format("Nothing to repair 
for %s in %s - unreplicated keyspace is ignored since repair was called with 
--ignore-unreplicated-keyspaces",
++                                                        options.getRanges(),
++                                                        keyspace));
+         }
+ 
 -        if (options.ignoreUnreplicatedKeyspaces() && allNeighbors.isEmpty())
 +        progressCounter.incrementAndGet();
 +
 +        boolean force = options.isForcedRepair();
 +
 +        if (force && options.isIncremental())
          {
 -            String ignoreUnreplicatedMessage = String.format("Nothing to 
repair for %s in %s - unreplicated keyspace is ignored since repair was called 
with --ignore-unreplicated-keyspaces",
 -                                                             
options.getRanges(),
 -                                                             keyspace);
 +            Set<InetAddressAndPort> actualNeighbors = 
Sets.newHashSet(Iterables.filter(allNeighbors, 
FailureDetector.instance::isAlive));
 +            force = !allNeighbors.equals(actualNeighbors);
 +            allNeighbors = actualNeighbors;
 +        }
 +        return new NeighborsAndRanges(force, allNeighbors, commonRanges);
 +    }
  
 -            logger.info("Repair {}", ignoreUnreplicatedMessage);
 -            fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.COMPLETE,
 -                                                     progress.get(),
 -                                                     totalProgress,
 -                                                     
ignoreUnreplicatedMessage));
 -            return;
 +    private void maybeStoreParentRepairStart(String[] cfnames)
 +    {
 +        if (!options.isPreview())
 +        {
 +            SystemDistributedKeyspace.startParentRepair(parentSession, 
keyspace, cfnames, options);
          }
 +    }
  
 -        // Validate columnfamilies
 -        List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
 -        try
 +    private void maybeStoreParentRepairSuccess(Collection<Range<Token>> 
successfulRanges)
 +    {
 +        if (!options.isPreview())
          {
 -            Iterables.addAll(columnFamilyStores, validColumnFamilies);
 -            progress.incrementAndGet();
 +            SystemDistributedKeyspace.successfulParentRepair(parentSession, 
successfulRanges);
          }
 -        catch (IllegalArgumentException e)
 +    }
 +
 +    private void maybeStoreParentRepairFailure(Throwable error)
 +    {
 +        if (!options.isPreview())
          {
 -            fireErrorAndComplete(tag, progress.get(), totalProgress, 
e.getMessage());
 -            return;
 +            SystemDistributedKeyspace.failParentRepair(parentSession, error);
          }
 +    }
  
 -        String[] cfnames = new String[columnFamilyStores.size()];
 -        for (int i = 0; i < columnFamilyStores.size(); i++)
 +    private void prepare(List<ColumnFamilyStore> columnFamilies, 
Set<InetAddressAndPort> allNeighbors, boolean force)
 +    {
 +        try (Timer.Context ignore = 
Keyspace.open(keyspace).metric.repairPrepareTime.time())
          {
 -            cfnames[i] = columnFamilyStores.get(i).name;
 +            ActiveRepairService.instance.prepareForRepair(parentSession, 
FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, force, 
columnFamilies);
 +            progressCounter.incrementAndGet();
          }
 +    }
  
 -        SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, 
cfnames, options);
 -        long repairedAt;
 -        try
 +    private void repair(String[] cfnames, NeighborsAndRanges 
neighborsAndRanges)
 +    {
 +        if (options.isPreview())
          {
 -            ActiveRepairService.instance.prepareForRepair(parentSession, 
FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
 -            repairedAt = 
ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
 -            progress.incrementAndGet();
 +            previewRepair(parentSession, creationTimeMillis, 
neighborsAndRanges.commonRanges, cfnames);
          }
 -        catch (Throwable t)
 +        else if (options.isIncremental())
          {
 -            SystemDistributedKeyspace.failParentRepair(parentSession, t);
 -            fireErrorAndComplete(tag, progress.get(), totalProgress, 
t.getMessage());
 -            return;
 +            incrementalRepair(parentSession, creationTimeMillis, 
neighborsAndRanges.force, traceState,
 +                              neighborsAndRanges.allNeighbors, 
neighborsAndRanges.commonRanges, cfnames);
          }
 +        else
 +        {
 +            normalRepair(parentSession, creationTimeMillis, traceState, 
neighborsAndRanges.commonRanges, cfnames);
 +        }
 +    }
  
 -        // Set up RepairJob executor for this repair command.
 -        final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(new 
JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
 -                                                                              
                                           Integer.MAX_VALUE,
 -                                                                              
                                           TimeUnit.SECONDS,
 -                                                                              
                                           new LinkedBlockingQueue<Runnable>(),
 -                                                                              
                                           new NamedThreadFactory("Repair#" + 
cmd),
 -                                                                              
                                           "internal"));
 +    private void normalRepair(UUID parentSession,
 +                              long startTime,
 +                              TraceState traceState,
 +                              List<CommonRange> commonRanges,
 +                              String... cfnames)
 +    {
  
 -        List<ListenableFuture<RepairSessionResult>> futures = new 
ArrayList<>(options.getRanges().size());
 -        for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : 
commonRanges)
 -        {
 -            final RepairSession session = 
ActiveRepairService.instance.submitRepairSession(parentSession,
 -                                                              p.right,
 -                                                              keyspace,
 -                                                              
options.getParallelism(),
 -                                                              p.left,
 -                                                              repairedAt,
 -                                                              
options.isPullRepair(),
 -                                                              executor,
 -                                                              cfnames);
 -            if (session == null)
 -                continue;
 -            // After repair session completes, notify client its result
 -            Futures.addCallback(session, new 
FutureCallback<RepairSessionResult>()
 -            {
 -                public void onSuccess(RepairSessionResult result)
 -                {
 -                    /**
 -                     * If the success message below is modified, it must also 
be updated on
 -                     * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
 -                     * for backward-compatibility support.
 -                     */
 -                    String message = String.format("Repair session %s for 
range %s finished", session.getId(),
 -                                                   
session.getRanges().toString());
 -                    logger.info(message);
 -                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.PROGRESS,
 -                                                             
progress.incrementAndGet(),
 -                                                             totalProgress,
 -                                                             message));
 -                }
 +        // Set up RepairJob executor for this repair command.
 +        ListeningExecutorService executor = createExecutor();
  
 -                public void onFailure(Throwable t)
 -                {
 -                    /**
 -                     * If the failure message below is modified, it must also 
be updated on
 -                     * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
 -                     * for backward-compatibility support.
 -                     */
 -                    String message = String.format("Repair session %s for 
range %s failed with error %s",
 -                                                   session.getId(), 
session.getRanges().toString(), t.getMessage());
 -                    logger.error(message, t);
 -                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.PROGRESS,
 -                                                             
progress.incrementAndGet(),
 -                                                             totalProgress,
 -                                                             message));
 -                }
 -            });
 -            futures.add(session);
 -        }
 +        // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the 
repairedAt times to be preserved across streamed sstables
 +        final ListenableFuture<List<RepairSessionResult>> allSessions = 
submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
  
          // After all repair sessions completes(successful or not),
          // run anticompaction if necessary and send finish notice back to 
client
diff --cc src/java/org/apache/cassandra/repair/messages/RepairOption.java
index f7ed052,14fff97..a76e6c0
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@@ -48,9 -47,7 +48,10 @@@ public class RepairOptio
      public static final String TRACE_KEY = "trace";
      public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair";
      public static final String PULL_REPAIR_KEY = "pullRepair";
 +    public static final String FORCE_REPAIR_KEY = "forceRepair";
 +    public static final String PREVIEW = "previewKind";
 +    public static final String OPTIMISE_STREAMS_KEY = "optimiseStreams";
+     public static final String IGNORE_UNREPLICATED_KS = 
"ignoreUnreplicatedKeyspaces";
  
      // we don't want to push nodes too much for repair
      public static final int MAX_JOB_THREADS = 4;
@@@ -175,10 -137,9 +176,11 @@@
          RepairParallelism parallelism = 
RepairParallelism.fromName(options.get(PARALLELISM_KEY));
          boolean primaryRange = 
Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
          boolean incremental = 
Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
 +        PreviewKind previewKind = 
PreviewKind.valueOf(options.getOrDefault(PREVIEW, PreviewKind.NONE.toString()));
          boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
 +        boolean force = Boolean.parseBoolean(options.get(FORCE_REPAIR_KEY));
          boolean pullRepair = 
Boolean.parseBoolean(options.get(PULL_REPAIR_KEY));
+         boolean ignoreUnreplicatedKeyspaces = 
Boolean.parseBoolean(options.get(IGNORE_UNREPLICATED_KS));
  
          int jobThreads = 1;
          if (options.containsKey(JOB_THREADS_KEY))
@@@ -189,13 -150,34 +191,13 @@@
              }
              catch (NumberFormatException ignore) {}
          }
 +
          // ranges
 -        String rangesStr = options.get(RANGES_KEY);
 -        Set<Range<Token>> ranges = new HashSet<>();
 -        if (rangesStr != null)
 -        {
 -            if (incremental)
 -                logger.warn("Incremental repair can't be requested with 
subrange repair " +
 -                            "because each subrange repair would generate an 
anti-compacted table. " +
 -                            "The repair will occur but without 
anti-compaction.");
 -            StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
 -            while (tokenizer.hasMoreTokens())
 -            {
 -                String[] rangeStr = tokenizer.nextToken().split(":", 2);
 -                if (rangeStr.length < 2)
 -                {
 -                    continue;
 -                }
 -                Token parsedBeginToken = 
partitioner.getTokenFactory().fromString(rangeStr[0].trim());
 -                Token parsedEndToken = 
partitioner.getTokenFactory().fromString(rangeStr[1].trim());
 -                if (parsedBeginToken.equals(parsedEndToken))
 -                {
 -                    throw new IllegalArgumentException("Start and end tokens 
must be different.");
 -                }
 -                ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
 -            }
 -        }
 +        Set<Range<Token>> ranges = parseRanges(options.get(RANGES_KEY), 
partitioner);
 +
 +        boolean asymmetricSyncing = 
Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY));
  
-         RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, 
previewKind, asymmetricSyncing);
 -        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, 
ignoreUnreplicatedKeyspaces);
++        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, 
previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces);
  
          // data centers
          String dataCentersStr = options.get(DATACENTERS_KEY);
@@@ -271,16 -253,14 +273,17 @@@
      private final int jobThreads;
      private final boolean isSubrangeRepair;
      private final boolean pullRepair;
 +    private final boolean forceRepair;
 +    private final PreviewKind previewKind;
 +    private final boolean optimiseStreams;
+     private final boolean ignoreUnreplicatedKeyspaces;
  
      private final Collection<String> columnFamilies = new HashSet<>();
      private final Collection<String> dataCenters = new HashSet<>();
      private final Collection<String> hosts = new HashSet<>();
      private final Collection<Range<Token>> ranges = new HashSet<>();
  
-     public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, 
PreviewKind previewKind, boolean optimiseStreams)
 -    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges, boolean isSubrangeRepair, boolean pullRepair, boolean 
ignoreUnreplicatedKeyspaces)
++    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, 
PreviewKind previewKind, boolean optimiseStreams, boolean 
ignoreUnreplicatedKeyspaces)
      {
          if (FBUtilities.isWindows &&
              (DatabaseDescriptor.getDiskAccessMode() != 
Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != 
Config.DiskAccessMode.standard) &&
@@@ -299,9 -279,7 +302,10 @@@
          this.ranges.addAll(ranges);
          this.isSubrangeRepair = isSubrangeRepair;
          this.pullRepair = pullRepair;
 +        this.forceRepair = forceRepair;
 +        this.previewKind = previewKind;
 +        this.optimiseStreams = optimiseStreams;
+         this.ignoreUnreplicatedKeyspaces = ignoreUnreplicatedKeyspaces;
      }
  
      public RepairParallelism getParallelism()
@@@ -384,28 -346,25 +388,34 @@@
          return dataCenters.size() == 1 && 
dataCenters.contains(DatabaseDescriptor.getLocalDataCenter());
      }
  
 +    public boolean optimiseStreams()
 +    {
 +        return optimiseStreams;
 +    }
 +
+     public boolean ignoreUnreplicatedKeyspaces()
+     {
+         return ignoreUnreplicatedKeyspaces;
+     }
++
      @Override
      public String toString()
      {
          return "repair options (" +
 -                       "parallelism: " + parallelism +
 -                       ", primary range: " + primaryRange +
 -                       ", incremental: " + incremental +
 -                       ", job threads: " + jobThreads +
 -                       ", ColumnFamilies: " + columnFamilies +
 -                       ", dataCenters: " + dataCenters +
 -                       ", hosts: " + hosts +
 -                       ", # of ranges: " + ranges.size() +
 -                       ", pull repair: " + pullRepair +
 -                       ", ignore unreplicated keyspaces: "+ 
ignoreUnreplicatedKeyspaces +
 -                       ')';
 +               "parallelism: " + parallelism +
 +               ", primary range: " + primaryRange +
 +               ", incremental: " + incremental +
 +               ", job threads: " + jobThreads +
 +               ", ColumnFamilies: " + columnFamilies +
 +               ", dataCenters: " + dataCenters +
 +               ", hosts: " + hosts +
 +               ", previewKind: " + previewKind +
 +               ", # of ranges: " + ranges.size() +
 +               ", pull repair: " + pullRepair +
 +               ", force repair: " + forceRepair +
 +               ", optimise streams: "+ optimiseStreams +
++               ", ignore unreplicated keyspaces: "+ 
ignoreUnreplicatedKeyspaces +
 +               ')';
      }
  
      public Map<String, String> asMap()
diff --cc src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 990d241,180748d..0f9bfb3
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@@ -94,30 -84,9 +94,32 @@@ public class Repair extends NodeToolCm
      @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = 
"Use --pull to perform a one way repair where data is only streamed from a 
remote node to this node.")
      private boolean pullRepair = false;
  
 +    @Option(title = "optimise_streams", name = {"-os", "--optimise-streams"}, 
description = "Use --optimise-streams to try to reduce the number of streams we 
do (EXPERIMENTAL, see CASSANDRA-3200).")
 +    private boolean optimiseStreams = false;
 +
+     @Option(title = "ignore_unreplicated_keyspaces", name = 
{"-iuk","--ignore-unreplicated-keyspaces"}, description = "Use 
--ignore-unreplicated-keyspaces to ignore keyspaces which are not replicated, 
otherwise the repair will fail")
+     private boolean ignoreUnreplicatedKeyspaces = false;
  
 +    private PreviewKind getPreviewKind()
 +    {
 +        if (validate)
 +        {
 +            return PreviewKind.REPAIRED;
 +        }
 +        else if (preview && fullRepair)
 +        {
 +            return PreviewKind.ALL;
 +        }
 +        else if (preview)
 +        {
 +            return PreviewKind.UNREPAIRED;
 +        }
 +        else
 +        {
 +            return PreviewKind.NONE;
 +        }
 +    }
 +
      @Override
      public void execute(NodeProbe probe)
      {
@@@ -146,9 -115,8 +148,11 @@@
              options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
              options.put(RepairOption.COLUMNFAMILIES_KEY, 
StringUtils.join(cfnames, ","));
              options.put(RepairOption.PULL_REPAIR_KEY, 
Boolean.toString(pullRepair));
 +            options.put(RepairOption.FORCE_REPAIR_KEY, 
Boolean.toString(force));
 +            options.put(RepairOption.PREVIEW, getPreviewKind().toString());
 +            options.put(RepairOption.OPTIMISE_STREAMS_KEY, 
Boolean.toString(optimiseStreams));
+             options.put(RepairOption.IGNORE_UNREPLICATED_KS, 
Boolean.toString(ignoreUnreplicatedKeyspaces));
+ 
              if (!startToken.isEmpty() || !endToken.isEmpty())
              {
                  options.put(RepairOption.RANGES_KEY, startToken + ":" + 
endToken);
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java
index 499eae3,2f85227..a4628db
--- 
a/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java
@@@ -22,62 -22,142 +22,193 @@@ import java.io.IOException
  
  import org.junit.Test;
  
 +import net.bytebuddy.ByteBuddy;
 +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 +import net.bytebuddy.implementation.MethodDelegation;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
++import org.assertj.core.api.Assertions;
  
 +import static net.bytebuddy.matcher.ElementMatchers.named;
  import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
  import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 -import static org.junit.Assert.assertEquals;
  
  public class RepairOperationalTest extends TestBaseImpl
  {
      @Test
 +    public void compactionBehindTest() throws IOException
 +    {
 +        try(Cluster cluster = init(Cluster.build(2)
 +                                          .withConfig(config -> 
config.with(GOSSIP).with(NETWORK))
 +                                          
.withInstanceInitializer(ByteBuddyHelper::install)
 +                                          .start()))
 +        {
 +            cluster.schemaChange("create table "+KEYSPACE+".tbl (id int 
primary key, x int)");
 +            for (int i = 0; i < 10; i++)
 +                cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl 
(id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i);
 +            cluster.forEach(i -> i.flush(KEYSPACE));
 +            cluster.forEach(i -> i.nodetoolResult("repair", 
"--full").asserts().success());
 +            cluster.get(2).runOnInstance(() -> {
 +                ByteBuddyHelper.pendingCompactions = 1000;
 +                
DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(500);
 +            });
 +            // make sure repair gets rejected on both nodes if 
pendingCompactions > threshold:
 +            cluster.forEach(i -> i.nodetoolResult("repair", 
"--full").asserts().failure());
 +            cluster.get(2).runOnInstance(() -> 
ByteBuddyHelper.pendingCompactions = 499);
 +            cluster.forEach(i -> i.nodetoolResult("repair", 
"--full").asserts().success());
 +        }
 +    }
 +
 +    public static class ByteBuddyHelper
 +    {
 +        public static volatile int pendingCompactions = 0;
++
 +        static void install(ClassLoader cl, int nodeNumber)
 +        {
 +            if (nodeNumber == 2)
 +            {
 +                new ByteBuddy().redefine(CompactionManager.class)
 +                               .method(named("getPendingTasks"))
 +                               
.intercept(MethodDelegation.to(ByteBuddyHelper.class))
 +                               .make()
 +                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
 +            }
 +        }
 +
 +        public static int getPendingTasks()
 +        {
 +            return pendingCompactions;
 +        }
 +    }
++
+     public void repairUnreplicatedKStest() throws IOException
+     {
+         try (Cluster cluster = init(Cluster.build(4)
+                                           .withDCs(2)
+                                           .withConfig(config -> 
config.with(GOSSIP).with(NETWORK))
+                                           .start()))
+         {
+             cluster.schemaChange("alter keyspace "+KEYSPACE+" with 
replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 
'datacenter2':0}");
+             cluster.schemaChange("create table "+KEYSPACE+".tbl (id int 
primary key, i int)");
+             for (int i = 0; i < 10; i++)
+                 cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl 
(id, i) values (?, ?)", ConsistencyLevel.ALL, i, i);
+             cluster.forEach(i -> i.flush(KEYSPACE));
+ 
+             cluster.get(3).nodetoolResult("repair", "-full", KEYSPACE , 
"tbl", "-st", "0", "-et", "1000")
+                    .asserts()
+                    .failure()
+                    .errorContains("Nothing to repair for (0,1000] in 
distributed_test_keyspace - aborting");
+             cluster.get(3).nodetoolResult("repair", "-full", KEYSPACE , 
"tbl", "-st", "0", "-et", "1000", "--ignore-unreplicated-keyspaces")
+                    .asserts()
+                    .success()
+                    .notificationContains("unreplicated keyspace is ignored 
since repair was called with --ignore-unreplicated-keyspaces");
+ 
+         }
+     }
+ 
+     @Test
+     public void dcFilterOnEmptyDC() throws IOException
+     {
+         try (Cluster cluster = Cluster.build().withRacks(2, 1, 2).start())
+         {
+             // 1-2 : datacenter1
+             // 3-4 : datacenter2
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 
'datacenter2':0}");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int 
PRIMARY KEY, i int)");
+             for (int i = 0; i < 10; i++)
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (id, i) VALUES (?, ?)", ConsistencyLevel.ALL, i, i);
+             cluster.forEach(i -> i.flush(KEYSPACE));
+ 
+             // choose a node in the DC that doesn't have any replicas
+             IInvokableInstance node = cluster.get(3);
 -            assertEquals("datacenter2", node.config().localDatacenter());
++            
Assertions.assertThat(node.config().localDatacenter()).isEqualTo("datacenter2");
+             // fails with "the local data center must be part of the repair"
+             node.nodetoolResult("repair", "-full",
+                                 "-dc", "datacenter1", "-dc", "datacenter2",
+                                 "--ignore-unreplicated-keyspaces",
+                                 "-st", "0", "-et", "1000",
+                                 KEYSPACE, "tbl")
+                 .asserts().success();
+         }
+     }
+ 
+     @Test
+     public void hostFilterDifferentDC() throws IOException
+     {
+         try (Cluster cluster = Cluster.build().withRacks(2, 1, 2).start())
+         {
+             // 1-2 : datacenter1
+             // 3-4 : datacenter2
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 
'datacenter2':0}");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int 
PRIMARY KEY, i int)");
+             for (int i = 0; i < 10; i++)
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (id, i) VALUES (?, ?)", ConsistencyLevel.ALL, i, i);
+             cluster.forEach(i -> i.flush(KEYSPACE));
+ 
+             // choose a node in the DC that doesn't have any replicas
+             IInvokableInstance node = cluster.get(3);
 -            assertEquals("datacenter2", node.config().localDatacenter());
++            
Assertions.assertThat(node.config().localDatacenter()).isEqualTo("datacenter2");
+             // fails with "Specified hosts [127.0.0.3, 127.0.0.1] do not 
share range (0,1000] needed for repair. Either restrict repair ranges with 
-st/-et options, or specify one of the neighbors that share this range with 
this node: [].. Check the logs on the repair participants for further details"
+             node.nodetoolResult("repair", "-full",
+                                 "-hosts", 
cluster.get(1).broadcastAddress().getAddress().getHostAddress(),
+                                 "-hosts", 
node.broadcastAddress().getAddress().getHostAddress(),
+                                 "--ignore-unreplicated-keyspaces",
+                                 "-st", "0", "-et", "1000",
+                                 KEYSPACE, "tbl")
+                 .asserts().success();
+         }
+     }
+ 
+     @Test
+     public void emptyDC() throws IOException
+     {
+         try (Cluster cluster = Cluster.build().withRacks(2, 1, 2).start())
+         {
+             // 1-2 : datacenter1
+             // 3-4 : datacenter2
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 
'datacenter2':0}");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int 
PRIMARY KEY, i int)");
+             for (int i = 0; i < 10; i++)
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (id, i) VALUES (?, ?)", ConsistencyLevel.ALL, i, i);
+             cluster.forEach(i -> i.flush(KEYSPACE));
+ 
+             // choose a node in the DC that doesn't have any replicas
+             IInvokableInstance node = cluster.get(3);
 -            assertEquals("datacenter2", node.config().localDatacenter());
++            
Assertions.assertThat(node.config().localDatacenter()).isEqualTo("datacenter2");
+             // fails with [2020-09-10 11:30:04,139] Repair command #1 failed 
with error Nothing to repair for (0,1000] in distributed_test_keyspace - 
aborting. Check the logs on the repair participants for further details
+             node.nodetoolResult("repair", "-full",
+                                 "--ignore-unreplicated-keyspaces",
+                                 "-st", "0", "-et", "1000",
+                                 KEYSPACE, "tbl")
+                 .asserts().success();
+         }
+     }
+ 
+     @Test
+     public void mainDC() throws IOException
+     {
+         try (Cluster cluster = Cluster.build().withRacks(2, 1, 2).start())
+         {
+             // 1-2 : datacenter1
+             // 3-4 : datacenter2
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 
'datacenter2':0}");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int 
PRIMARY KEY, i int)");
+             for (int i = 0; i < 10; i++)
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (id, i) VALUES (?, ?)", ConsistencyLevel.ALL, i, i);
+             cluster.forEach(i -> i.flush(KEYSPACE));
+ 
+             // choose a node in the DC that doesn't have any replicas
+             IInvokableInstance node = cluster.get(1);
 -            assertEquals("datacenter1", node.config().localDatacenter());
++            
Assertions.assertThat(node.config().localDatacenter()).isEqualTo("datacenter1");
+             node.nodetoolResult("repair", "-full",
+                                 "--ignore-unreplicated-keyspaces",
+                                 "-st", "0", "-et", "1000",
+                                 KEYSPACE, "tbl")
+                 .asserts().success();
+         }
+     }
+ 
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to