This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0a1e8d168bac0f562774814a808e313e1d2d6571
Merge: c092c46 c8c3c26
Author: Sam Tunnicliffe <s...@beobal.com>
AuthorDate: Fri Jun 12 11:21:14 2020 +0100

    Merge branch 'cassandra-2.2' into cassandra-3.0

 CHANGES.txt                                        |  1 +
 .../cassandra/cql3/functions/UDFunction.java       | 22 ++++++++--------
 src/java/org/apache/cassandra/db/Directories.java  | 28 ++++++++++-----------
 ...Directories.java => DisallowedDirectories.java} | 29 +++++++++++-----------
 ...sMBean.java => DisallowedDirectoriesMBean.java} |  3 ++-
 .../db/compaction/AbstractCompactionStrategy.java  |  6 ++---
 .../cassandra/db/compaction/LeveledManifest.java   |  2 +-
 .../cassandra/hints/HintsDispatchExecutor.java     |  2 +-
 .../org/apache/cassandra/hints/HintsStore.java     | 10 ++++----
 .../cassandra/io/sstable/format/SSTableReader.java |  2 +-
 .../cassandra/service/DefaultFSErrorHandler.java   |  6 ++---
 test/unit/org/apache/cassandra/Util.java           |  4 +--
 .../org/apache/cassandra/db/DirectoriesTest.java   |  2 +-
 ....java => CorruptedSSTablesCompactionsTest.java} | 17 +++++++------
 14 files changed, 69 insertions(+), 65 deletions(-)

diff --cc CHANGES.txt
index 3fdbb96,b10a057..d506dc8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,19 -1,5 +1,20 @@@
 -2.2.18
 +3.0.21
 + * Fix replica-side filtering returning stale data with CL > ONE 
(CASSANDRA-8272, CASSANDRA-8273)
 + * Fix duplicated row on 2.x upgrades when multi-rows range tombstones 
interact with collection ones (CASSANDRA-15805)
 + * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to 
avoid race conditions (CASSANDRA-15667)
 + * EmptyType doesn't override writeValue so could attempt to write bytes when 
expected not to (CASSANDRA-15790)
 + * Fix index queries on partition key columns when some partitions contains 
only static data (CASSANDRA-13666)
 + * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789)
 + * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if 
IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
 + * Fix Debian init start/stop (CASSANDRA-15770)
 + * Fix infinite loop on index query paging in tables with clustering 
(CASSANDRA-14242)
 + * Fix chunk index overflow due to large sstable with small chunk length 
(CASSANDRA-15595)
 + * cqlsh return non-zero status when STDIN CQL fails (CASSANDRA-15623)
 + * Don't skip sstables in slice queries based only on local min/max/deletion 
timestamp (CASSANDRA-15690)
 + * Memtable memory allocations may deadlock (CASSANDRA-15367)
 + * Run evictFromMembership in GossipStage (CASSANDRA-15592)
 +Merged from 2.2:
+  * Fix nomenclature of allow and deny lists (CASSANDRA-15862)
   * Remove generated files from source artifact (CASSANDRA-15849)
   * Remove duplicated tools binaries from tarballs (CASSANDRA-15768)
   * Duplicate results with DISTINCT queries in mixed mode (CASSANDRA-15501)
diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 7b69342,1e5cea6..27f9eb8
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@@ -71,111 -49,10 +71,111 @@@ public abstract class UDFunction extend
      protected final String language;
      protected final String body;
  
 -    protected final DataType[] argDataTypes;
 -    protected final DataType returnDataType;
 +    protected final TypeCodec<Object>[] argCodecs;
 +    protected final TypeCodec<Object> returnCodec;
      protected final boolean calledOnNullInput;
  
 +    //
-     // Access to classes is controlled via a whitelist and a blacklist.
++    // Access to classes is controlled via allow and disallow lists.
 +    //
 +    // When a class is requested (both during compilation and runtime),
-     // the whitelistedPatterns array is searched first, whether the
++    // the allowedPatterns array is searched first, whether the
 +    // requested name matches one of the patterns. If not, nothing is
 +    // returned from the class-loader - meaning ClassNotFoundException
 +    // during runtime and "type could not resolved" during compilation.
 +    //
-     // If a whitelisted pattern has been found, the blacklistedPatterns
++    // If an allowed pattern has been found, the disallowedPatterns
 +    // array is searched for a match. If a match is found, class-loader
 +    // rejects access. Otherwise the class/resource can be loaded.
 +    //
-     private static final String[] whitelistedPatterns =
++    private static final String[] allowedPatterns =
 +    {
 +    "com/datastax/driver/core/",
 +    "com/google/common/reflect/TypeToken",
 +    "java/io/IOException.class",
 +    "java/io/Serializable.class",
 +    "java/lang/",
 +    "java/math/",
 +    "java/net/InetAddress.class",
 +    "java/net/Inet4Address.class",
 +    "java/net/Inet6Address.class",
 +    "java/net/UnknownHostException.class", // req'd by InetAddress
 +    "java/net/NetworkInterface.class", // req'd by InetAddress
 +    "java/net/SocketException.class", // req'd by InetAddress
 +    "java/nio/Buffer.class",
 +    "java/nio/ByteBuffer.class",
 +    "java/text/",
 +    "java/time/",
 +    "java/util/",
 +    "org/apache/cassandra/cql3/functions/JavaUDF.class",
 +    "org/apache/cassandra/exceptions/",
 +    };
-     // Only need to blacklist a pattern, if it would otherwise be allowed via 
whitelistedPatterns
-     private static final String[] blacklistedPatterns =
++    // Only need to disallow a pattern, if it would otherwise be allowed via 
allowedPatterns
++    private static final String[] disallowedPatterns =
 +    {
 +    "com/datastax/driver/core/Cluster.class",
 +    "com/datastax/driver/core/Metrics.class",
 +    "com/datastax/driver/core/NettyOptions.class",
 +    "com/datastax/driver/core/Session.class",
 +    "com/datastax/driver/core/Statement.class",
 +    "com/datastax/driver/core/TimestampGenerator.class", // indirectly covers 
ServerSideTimestampGenerator + ThreadLocalMonotonicTimestampGenerator
 +    "java/lang/Compiler.class",
 +    "java/lang/InheritableThreadLocal.class",
 +    "java/lang/Package.class",
 +    "java/lang/Process.class",
 +    "java/lang/ProcessBuilder.class",
 +    "java/lang/ProcessEnvironment.class",
 +    "java/lang/ProcessImpl.class",
 +    "java/lang/Runnable.class",
 +    "java/lang/Runtime.class",
 +    "java/lang/Shutdown.class",
 +    "java/lang/Thread.class",
 +    "java/lang/ThreadGroup.class",
 +    "java/lang/ThreadLocal.class",
 +    "java/lang/instrument/",
 +    "java/lang/invoke/",
 +    "java/lang/management/",
 +    "java/lang/ref/",
 +    "java/lang/reflect/",
 +    "java/util/ServiceLoader.class",
 +    "java/util/Timer.class",
 +    "java/util/concurrent/",
 +    "java/util/function/",
 +    "java/util/jar/",
 +    "java/util/logging/",
 +    "java/util/prefs/",
 +    "java/util/spi/",
 +    "java/util/stream/",
 +    "java/util/zip/",
 +    };
 +
 +    static boolean secureResource(String resource)
 +    {
 +        while (resource.startsWith("/"))
 +            resource = resource.substring(1);
 +
-         for (String white : whitelistedPatterns)
-             if (resource.startsWith(white))
++        for (String allowed : allowedPatterns)
++            if (resource.startsWith(allowed))
 +            {
 +
-                 // resource is in whitelistedPatterns, let's see if it is not 
explicityl blacklisted
-                 for (String black : blacklistedPatterns)
-                     if (resource.startsWith(black))
++                // resource is in allowedPatterns, let's see if it is not 
explicitly disallowed
++                for (String disallowed : disallowedPatterns)
++                    if (resource.startsWith(disallowed))
 +                    {
 +                        logger.trace("access denied: resource {}", resource);
 +                        return false;
 +                    }
 +
 +                return true;
 +            }
 +
 +        logger.trace("access denied: resource {}", resource);
 +        return false;
 +    }
 +
 +    // setup the UDF class loader with no parent class loader so that we have 
full control about what class/resource UDF uses
 +    static final ClassLoader udfClassLoader = new UDFClassLoader();
 +
      protected UDFunction(FunctionName name,
                           List<ColumnIdentifier> argNames,
                           List<AbstractType<?>> argTypes,
diff --cc src/java/org/apache/cassandra/db/Directories.java
index b104509,d1aa650..af3f63c
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -331,37 -313,9 +331,37 @@@ public class Directorie
      }
  
      /**
-      * Returns a temporary subdirectory on non-blacklisted data directory
++     * Returns a temporary subdirectory on allowed data directory
 +     * that _currently_ has {@code writeSize} bytes as usable space.
 +     * This method does not create the temporary directory.
 +     *
-      * @throws IOError if all directories are blacklisted.
++     * @throws IOError if all directories are disallowed.
 +     */
 +    public File getTemporaryWriteableDirectoryAsFile(long writeSize)
 +    {
 +        File location = getLocationForDisk(getWriteableLocation(writeSize));
 +        if (location == null)
 +            return null;
 +        return new File(location, TMP_SUBDIR);
 +    }
 +
 +    public void removeTemporaryDirectories()
 +    {
 +        for (File dataDir : dataPaths)
 +        {
 +            File tmpDir = new File(dataDir, TMP_SUBDIR);
 +            if (tmpDir.exists())
 +            {
 +                logger.debug("Removing temporary directory {}", tmpDir);
 +                FileUtils.deleteRecursive(tmpDir);
 +            }
 +        }
 +    }
 +
 +    /**
-      * Returns a non-blacklisted data directory that _currently_ has {@code 
writeSize} bytes as usable space.
+      * Returns an allowed data directory that _currently_ has {@code 
writeSize} bytes as usable space.
       *
-      * @throws IOError if all directories are blacklisted.
+      * @throws IOError if all directories are disallowed.
       */
      public DataDirectory getWriteableLocation(long writeSize)
      {
@@@ -369,13 -323,13 +369,13 @@@
  
          long totalAvailable = 0L;
  
-         // pick directories with enough space and so that resulting sstable 
dirs aren't blacklisted for writes.
+         // pick directories with enough space and so that resulting sstable 
dirs aren't disallowed for writes.
          boolean tooBig = false;
 -        for (DataDirectory dataDir : dataDirectories)
 +        for (DataDirectory dataDir : paths)
          {
-             if 
(BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+             if 
(DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir)))
              {
-                 logger.trace("removing blacklisted candidate {}", 
dataDir.location);
+                 logger.trace("removing disallowed candidate {}", 
dataDir.location);
                  continue;
              }
              DataDirectoryCandidate candidate = new 
DataDirectoryCandidate(dataDir);
@@@ -392,9 -346,9 +392,9 @@@
  
          if (candidates.isEmpty())
              if (tooBig)
 -                throw new RuntimeException("Insufficient disk space to write 
" + writeSize + " bytes");
 +                throw new FSDiskFullWriteError(new IOException("Insufficient 
disk space to write " + writeSize + " bytes"), "");
              else
-                 throw new FSWriteError(new IOException("All configured data 
directories have been blacklisted as unwritable for erroring out"), "");
+                 throw new FSWriteError(new IOException("All configured data 
directories have been disallowed as unwritable for erroring out"), "");
  
          // shortcut for single data directory systems
          if (candidates.size() == 1)
@@@ -437,9 -391,9 +437,9 @@@
          long writeSize = expectedTotalWriteSize / estimatedSSTables;
          long totalAvailable = 0L;
  
 -        for (DataDirectory dataDir : dataDirectories)
 +        for (DataDirectory dataDir : paths)
          {
-             if 
(BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+             if 
(DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir)))
                    continue;
              DataDirectoryCandidate candidate = new 
DataDirectoryCandidate(dataDir);
              // exclude directory if its total writeSize does not fit to data 
directory
diff --cc src/java/org/apache/cassandra/db/DisallowedDirectories.java
index afa726b,c0518e2..75b5e79
--- a/src/java/org/apache/cassandra/db/DisallowedDirectories.java
+++ b/src/java/org/apache/cassandra/db/DisallowedDirectories.java
@@@ -25,16 -25,14 +25,16 @@@ import java.util.Collections
  import java.util.Set;
  import java.util.concurrent.CopyOnWriteArraySet;
  
 +import com.google.common.annotations.VisibleForTesting;
 +
- import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.MBeanWrapper;
  
- public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
+ public class DisallowedDirectories implements DisallowedDirectoriesMBean
  {
-     public static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=BlacklistedDirectories";
-     private static final Logger logger = 
LoggerFactory.getLogger(BlacklistedDirectories.class);
-     private static final BlacklistedDirectories instance = new 
BlacklistedDirectories();
+     public static final String DEPRECATED_MBEAN_NAME = 
"org.apache.cassandra.db:type=BlacklistedDirectories";
+     public static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=DisallowedDirectories";
+     private static final Logger logger = 
LoggerFactory.getLogger(DisallowedDirectories.class);
+     private static final DisallowedDirectories instance = new 
DisallowedDirectories();
  
      private final Set<File> unreadableDirectories = new 
CopyOnWriteArraySet<File>();
      private final Set<File> unwritableDirectories = new 
CopyOnWriteArraySet<File>();
@@@ -90,19 -89,8 +91,19 @@@
      }
  
      /**
 +     * Testing only!
 +     * Clear the set of unwritable directories.
 +     */
 +    @VisibleForTesting
 +    public static void clearUnwritableUnsafe()
 +    {
 +        instance.unwritableDirectories.clear();
 +    }
 +
 +
 +    /**
-      * Tells whether or not the directory is blacklisted for reads.
-      * @return whether or not the directory is blacklisted for reads.
+      * Tells whether or not the directory is disallowed for reads.
+      * @return whether or not the directory is disallowed for reads.
       */
      public static boolean isUnreadable(File directory)
      {
diff --cc 
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 2348d19,f9ed780..7219504
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -247,27 -243,22 +247,27 @@@ public abstract class AbstractCompactio
      }
  
      /**
-      * Filters SSTables that are to be blacklisted from the given collection
+      * Filters SSTables that are to be excluded from the given collection
       *
-      * @param originalCandidates The collection to check for blacklisted 
SSTables
-      * @return list of the SSTables with blacklisted ones filtered out
+      * @param originalCandidates The collection to check for excluded SSTables
+      * @return list of the SSTables with excluded ones filtered out
       */
 -    public static Iterable<SSTableReader> 
filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
 +    public static List<SSTableReader> 
filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
      {
 -        return Iterables.filter(originalCandidates, new 
Predicate<SSTableReader>()
 +        List<SSTableReader> filtered = new ArrayList<>();
 +        for (SSTableReader sstable : originalCandidates)
          {
 -            public boolean apply(SSTableReader sstable)
 -            {
 -                return !sstable.isMarkedSuspect();
 -            }
 -        });
 +            if (!sstable.isMarkedSuspect())
 +                filtered.add(sstable);
 +        }
 +        return filtered;
      }
  
 +
 +    public ScannerList getScanners(Collection<SSTableReader> sstables, 
Range<Token> range)
 +    {
 +        return range == null ? getScanners(sstables, 
(Collection<Range<Token>>)null) : getScanners(sstables, 
Collections.singleton(range));
 +    }
      /**
       * Returns a list of KeyScanners given sstables and a range on which to 
scan.
       * The default implementation simply grab one SSTableScanner per-sstable, 
but overriding this method
diff --cc src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index d6c28d4,0000000..c562dd0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@@ -1,309 -1,0 +1,309 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.hints;
 +
 +import java.io.File;
 +import java.net.InetAddress;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.function.BooleanSupplier;
 +import java.util.function.Function;
 +import java.util.function.Supplier;
 +
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.io.FSReadError;
 +import org.apache.cassandra.service.StorageService;
 +
 +/**
 + * A multi-threaded (by default) executor for dispatching hints.
 + *
 + * Most of dispatch is triggered by {@link HintsDispatchTrigger} running 
every ~10 seconds.
 + */
 +final class HintsDispatchExecutor
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(HintsDispatchExecutor.class);
 +
 +    private final File hintsDirectory;
 +    private final ExecutorService executor;
 +    private final AtomicBoolean isPaused;
 +    private final Function<InetAddress, Boolean> isAlive;
 +    private final Map<UUID, Future> scheduledDispatches;
 +
 +    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean 
isPaused, Function<InetAddress, Boolean> isAlive)
 +    {
 +        this.hintsDirectory = hintsDirectory;
 +        this.isPaused = isPaused;
 +        this.isAlive = isAlive;
 +
 +        scheduledDispatches = new ConcurrentHashMap<>();
 +        executor = new JMXEnabledThreadPoolExecutor(maxThreads, 1, 
TimeUnit.MINUTES,
 +                                                    new 
LinkedBlockingQueue<>(),
 +                                                    new 
NamedThreadFactory("HintsDispatcher", Thread.MIN_PRIORITY),
 +                                                    "internal");
 +    }
 +
 +    /*
 +     * It's safe to terminate dispatch in process and to deschedule dispatch.
 +     */
 +    void shutdownBlocking()
 +    {
 +        scheduledDispatches.clear();
 +        executor.shutdownNow();
 +        try
 +        {
 +            executor.awaitTermination(1, TimeUnit.MINUTES);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +    }
 +
 +    boolean isScheduled(HintsStore store)
 +    {
 +        return scheduledDispatches.containsKey(store.hostId);
 +    }
 +
 +    Future dispatch(HintsStore store)
 +    {
 +        return dispatch(store, store.hostId);
 +    }
 +
 +    Future dispatch(HintsStore store, UUID hostId)
 +    {
 +        /*
 +         * It is safe to perform dispatch for the same host id concurrently 
in two or more threads,
 +         * however there is nothing to win from it - so we don't.
 +         *
 +         * Additionally, having just one dispatch task per host id ensures 
that we'll never violate our per-destination
 +         * rate limit, without having to share a ratelimiter between threads.
 +         *
 +         * It also simplifies reasoning about dispatch sessions.
 +         */
 +        return scheduledDispatches.computeIfAbsent(hostId, uuid -> 
executor.submit(new DispatchHintsTask(store, hostId)));
 +    }
 +
 +    Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
 +    {
 +        return executor.submit(new TransferHintsTask(catalog, 
hostIdSupplier));
 +    }
 +
 +    void completeDispatchBlockingly(HintsStore store)
 +    {
 +        Future future = scheduledDispatches.get(store.hostId);
 +        try
 +        {
 +            if (future != null)
 +                future.get();
 +        }
 +        catch (ExecutionException | InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    void interruptDispatch(UUID hostId)
 +    {
 +        Future future = scheduledDispatches.remove(hostId);
 +
 +        if (null != future)
 +            future.cancel(true);
 +    }
 +
 +    private final class TransferHintsTask implements Runnable
 +    {
 +        private final HintsCatalog catalog;
 +
 +        /*
 +         * Supplies target hosts to stream to. Generally returns the one the 
DynamicSnitch thinks is closest.
 +         * We use a supplier here to be able to get a new host if the current 
one dies during streaming.
 +         */
 +        private final Supplier<UUID> hostIdSupplier;
 +
 +        private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> 
hostIdSupplier)
 +        {
 +            this.catalog = catalog;
 +            this.hostIdSupplier = hostIdSupplier;
 +        }
 +
 +        @Override
 +        public void run()
 +        {
 +            UUID hostId = hostIdSupplier.get();
 +            InetAddress address = 
StorageService.instance.getEndpointForHostId(hostId);
 +            logger.info("Transferring all hints to {}: {}", address, hostId);
 +            if (transfer(hostId))
 +                return;
 +
 +            logger.warn("Failed to transfer all hints to {}: {}; will retry 
in {} seconds", address, hostId, 10);
 +
 +            try
 +            {
 +                TimeUnit.SECONDS.sleep(10);
 +            }
 +            catch (InterruptedException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +
 +            hostId = hostIdSupplier.get();
 +            logger.info("Transferring all hints to {}: {}", address, hostId);
 +            if (!transfer(hostId))
 +            {
 +                logger.error("Failed to transfer all hints to {}: {}", 
address, hostId);
 +                throw new RuntimeException("Failed to transfer all hints to " 
+ hostId);
 +            }
 +        }
 +
 +        private boolean transfer(UUID hostId)
 +        {
 +            catalog.stores()
 +                   .map(store -> new DispatchHintsTask(store, hostId))
 +                   .forEach(Runnable::run);
 +
 +            return !catalog.hasFiles();
 +        }
 +    }
 +
 +    private final class DispatchHintsTask implements Runnable
 +    {
 +        private final HintsStore store;
 +        private final UUID hostId;
 +        private final RateLimiter rateLimiter;
 +
 +        DispatchHintsTask(HintsStore store, UUID hostId)
 +        {
 +            this.store = store;
 +            this.hostId = hostId;
 +
 +            // rate limit is in bytes per second. Uses Double.MAX_VALUE if 
disabled (set to 0 in cassandra.yaml).
 +            // max rate is scaled by the number of nodes in the cluster 
(CASSANDRA-5272).
 +            // the goal is to bound maximum hints traffic going towards a 
particular node from the rest of the cluster,
 +            // not total outgoing hints traffic from this node - this is why 
the rate limiter is not shared between
 +            // all the dispatch tasks (as there will be at most one dispatch 
task for a particular host id at a time).
 +            int nodesCount = Math.max(1, 
StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
 +            int throttleInKB = 
DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount;
 +            this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? 
Double.MAX_VALUE : throttleInKB * 1024);
 +        }
 +
 +        public void run()
 +        {
 +            try
 +            {
 +                dispatch();
 +            }
 +            finally
 +            {
 +                scheduledDispatches.remove(hostId);
 +            }
 +        }
 +
 +        private void dispatch()
 +        {
 +            while (true)
 +            {
 +                if (isPaused.get())
 +                    break;
 +
 +                HintsDescriptor descriptor = store.poll();
 +                if (descriptor == null)
 +                    break;
 +
 +                try
 +                {
 +                    if (!dispatch(descriptor))
 +                        break;
 +                }
 +                catch (FSReadError e)
 +                {
 +                    logger.error("Failed to dispatch hints file {}: file is 
corrupted ({})", descriptor.fileName(), e);
 +                    store.cleanUp(descriptor);
-                     store.blacklist(descriptor);
++                    store.markCorrupted(descriptor);
 +                    throw e;
 +                }
 +            }
 +        }
 +
 +        /*
 +         * Will return true if dispatch was successful, false if we hit a 
failure (destination node went down, for example).
 +         */
 +        private boolean dispatch(HintsDescriptor descriptor)
 +        {
 +            logger.trace("Dispatching hints file {}", descriptor.fileName());
 +
 +            InetAddress address = 
StorageService.instance.getEndpointForHostId(hostId);
 +            if (address != null)
 +                return deliver(descriptor, address);
 +
 +            // address == null means the target no longer exist; find new 
home for each hint entry.
 +            convert(descriptor);
 +            return true;
 +        }
 +
 +        private boolean deliver(HintsDescriptor descriptor, InetAddress 
address)
 +        {
 +            File file = new File(hintsDirectory, descriptor.fileName());
 +            InputPosition offset = store.getDispatchOffset(descriptor);
 +
 +            BooleanSupplier shouldAbort = () -> !isAlive.apply(address) || 
isPaused.get();
 +            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, 
rateLimiter, address, descriptor.hostId, shouldAbort))
 +            {
 +                if (offset != null)
 +                    dispatcher.seek(offset);
 +
 +                if (dispatcher.dispatch())
 +                {
 +                    store.delete(descriptor);
 +                    store.cleanUp(descriptor);
 +                    logger.info("Finished hinted handoff of file {} to 
endpoint {}: {}", descriptor.fileName(), address, hostId);
 +                    return true;
 +                }
 +                else
 +                {
 +                    store.markDispatchOffset(descriptor, 
dispatcher.dispatchPosition());
 +                    store.offerFirst(descriptor);
 +                    logger.info("Finished hinted handoff of file {} to 
endpoint {}: {}, partially", descriptor.fileName(), address, hostId);
 +                    return false;
 +                }
 +            }
 +        }
 +
 +        // for each hint in the hints file for a node that isn't part of the 
ring anymore, write RF hints for each replica
 +        private void convert(HintsDescriptor descriptor)
 +        {
 +            File file = new File(hintsDirectory, descriptor.fileName());
 +
 +            try (HintsReader reader = HintsReader.open(file, rateLimiter))
 +            {
 +                reader.forEach(page -> 
page.hintsIterator().forEachRemaining(HintsService.instance::writeForAllReplicas));
 +                store.delete(descriptor);
 +                store.cleanUp(descriptor);
 +                logger.info("Finished converting hints file {}", 
descriptor.fileName());
 +            }
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/hints/HintsStore.java
index 032de5a,0000000..b08fc72
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@@ -1,220 -1,0 +1,220 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.hints;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentLinkedDeque;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableMap;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +/**
 + * Encapsulates the state of a peer's hints: the queue of hints files for 
dispatch, and the current writer (if any).
 + *
 + * The queue for dispatch is multi-threading safe.
 + *
 + * The writer MUST only be accessed by {@link HintsWriteExecutor}.
 + */
 +final class HintsStore
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(HintsStore.class);
 +
 +    public final UUID hostId;
 +    private final File hintsDirectory;
 +    private final ImmutableMap<String, Object> writerParams;
 +
 +    private final Map<HintsDescriptor, InputPosition> dispatchPositions;
 +    private final Deque<HintsDescriptor> dispatchDequeue;
-     private final Queue<HintsDescriptor> blacklistedFiles;
++    private final Queue<HintsDescriptor> corruptedFiles;
 +
 +    // last timestamp used in a descriptor; make sure to not reuse the same 
timestamp for new descriptors.
 +    private volatile long lastUsedTimestamp;
 +    private volatile HintsWriter hintsWriter;
 +
 +    private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, 
Object> writerParams, List<HintsDescriptor> descriptors)
 +    {
 +        this.hostId = hostId;
 +        this.hintsDirectory = hintsDirectory;
 +        this.writerParams = writerParams;
 +
 +        dispatchPositions = new ConcurrentHashMap<>();
 +        dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
-         blacklistedFiles = new ConcurrentLinkedQueue<>();
++        corruptedFiles = new ConcurrentLinkedQueue<>();
 +
 +        //noinspection resource
 +        lastUsedTimestamp = descriptors.stream().mapToLong(d -> 
d.timestamp).max().orElse(0L);
 +    }
 +
 +    static HintsStore create(UUID hostId, File hintsDirectory, 
ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
 +    {
 +        descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, 
d2.timestamp));
 +        return new HintsStore(hostId, hintsDirectory, writerParams, 
descriptors);
 +    }
 +
 +    @VisibleForTesting
 +    int getDispatchQueueSize()
 +    {
 +        return dispatchDequeue.size();
 +    }
 +
 +    InetAddress address()
 +    {
 +        return StorageService.instance.getEndpointForHostId(hostId);
 +    }
 +
 +    boolean isLive()
 +    {
 +        InetAddress address = address();
 +        return address != null && FailureDetector.instance.isAlive(address);
 +    }
 +
 +    HintsDescriptor poll()
 +    {
 +        return dispatchDequeue.poll();
 +    }
 +
 +    void offerFirst(HintsDescriptor descriptor)
 +    {
 +        dispatchDequeue.offerFirst(descriptor);
 +    }
 +
 +    void offerLast(HintsDescriptor descriptor)
 +    {
 +        dispatchDequeue.offerLast(descriptor);
 +    }
 +
 +    void deleteAllHints()
 +    {
 +        HintsDescriptor descriptor;
 +        while ((descriptor = poll()) != null)
 +        {
 +            cleanUp(descriptor);
 +            delete(descriptor);
 +        }
 +
-         while ((descriptor = blacklistedFiles.poll()) != null)
++        while ((descriptor = corruptedFiles.poll()) != null)
 +        {
 +            cleanUp(descriptor);
 +            delete(descriptor);
 +        }
 +    }
 +
 +    void delete(HintsDescriptor descriptor)
 +    {
 +        File hintsFile = new File(hintsDirectory, descriptor.fileName());
 +        if (hintsFile.delete())
 +            logger.info("Deleted hint file {}", descriptor.fileName());
 +        else
 +            logger.error("Failed to delete hint file {}", 
descriptor.fileName());
 +
 +        //noinspection ResultOfMethodCallIgnored
 +        new File(hintsDirectory, descriptor.checksumFileName()).delete();
 +    }
 +
 +    boolean hasFiles()
 +    {
 +        return !dispatchDequeue.isEmpty();
 +    }
 +
 +    InputPosition getDispatchOffset(HintsDescriptor descriptor)
 +    {
 +        return dispatchPositions.get(descriptor);
 +    }
 +
 +    void markDispatchOffset(HintsDescriptor descriptor, InputPosition 
inputPosition)
 +    {
 +        dispatchPositions.put(descriptor, inputPosition);
 +    }
 +
 +    void cleanUp(HintsDescriptor descriptor)
 +    {
 +        dispatchPositions.remove(descriptor);
 +    }
 +
-     void blacklist(HintsDescriptor descriptor)
++    void markCorrupted(HintsDescriptor descriptor)
 +    {
-         blacklistedFiles.add(descriptor);
++        corruptedFiles.add(descriptor);
 +    }
 +
 +    /*
 +     * Methods dealing with HintsWriter.
 +     *
 +     * All of these, with the exception of isWriting(), are for exclusively 
single-threaded use by HintsWriteExecutor.
 +     */
 +
 +    boolean isWriting()
 +    {
 +        return hintsWriter != null;
 +    }
 +
 +    HintsWriter getOrOpenWriter()
 +    {
 +        if (hintsWriter == null)
 +            hintsWriter = openWriter();
 +        return hintsWriter;
 +    }
 +
 +    HintsWriter getWriter()
 +    {
 +        return hintsWriter;
 +    }
 +
 +    private HintsWriter openWriter()
 +    {
 +        lastUsedTimestamp = Math.max(System.currentTimeMillis(), 
lastUsedTimestamp + 1);
 +        HintsDescriptor descriptor = new HintsDescriptor(hostId, 
lastUsedTimestamp, writerParams);
 +
 +        try
 +        {
 +            return HintsWriter.create(hintsDirectory, descriptor);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, descriptor.fileName());
 +        }
 +    }
 +
 +    void closeWriter()
 +    {
 +        if (hintsWriter != null)
 +        {
 +            hintsWriter.close();
 +            offerLast(hintsWriter.descriptor());
 +            hintsWriter = null;
 +            SyncUtil.trySyncDir(hintsDirectory);
 +        }
 +    }
 +
 +    void fsyncWriter()
 +    {
 +        if (hintsWriter != null)
 +            hintsWriter.fsync();
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/Util.java
index d758efe,f6b4771..a49440d
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -539,113 -383,4 +539,113 @@@ public class Uti
      {
          thread.join(10000);
      }
 +
 +    public static AssertionError runCatchingAssertionError(Runnable test)
 +    {
 +        try
 +        {
 +            test.run();
 +            return null;
 +        }
 +        catch (AssertionError e)
 +        {
 +            return e;
 +        }
 +    }
 +
 +    /**
 +     * Wrapper function used to run a test that can sometimes flake for 
uncontrollable reasons.
 +     *
 +     * If the given test fails on the first run, it is executed the given 
number of times again, expecting all secondary
 +     * runs to succeed. If they do, the failure is understood as a flake and 
the test is treated as passing.
 +     *
 +     * Do not use this if the test is deterministic and its success is not 
influenced by external factors (such as time,
 +     * selection of random seed, network failures, etc.). If the test can be 
made independent of such factors, it is
 +     * probably preferable to do so rather than use this method.
 +     *
 +     * @param test The test to run.
 +     * @param rerunsOnFailure How many times to re-run it if it fails. All 
reruns must pass.
 +     * @param message Message to send to System.err on initial failure.
 +     */
 +    public static void flakyTest(Runnable test, int rerunsOnFailure, String 
message)
 +    {
 +        AssertionError e = runCatchingAssertionError(test);
 +        if (e == null)
 +            return;     // success
 +        System.err.format("Test failed. %s%n"
 +                        + "Re-running %d times to verify it isn't failing 
more often than it should.%n"
 +                        + "Failure was: %s%n", message, rerunsOnFailure, e);
 +        e.printStackTrace();
 +
 +        int rerunsFailed = 0;
 +        for (int i = 0; i < rerunsOnFailure; ++i)
 +        {
 +            AssertionError t = runCatchingAssertionError(test);
 +            if (t != null)
 +            {
 +                ++rerunsFailed;
 +                e.addSuppressed(t);
 +            }
 +        }
 +        if (rerunsFailed > 0)
 +        {
 +            System.err.format("Test failed in %d of the %d reruns.%n", 
rerunsFailed, rerunsOnFailure);
 +            throw e;
 +        }
 +
 +        System.err.println("All reruns succeeded. Failure treated as flake.");
 +    }
 +
 +    // for use with Optional in tests, can be used as an argument to 
orElseThrow
 +    public static Supplier<AssertionError> throwAssert(final String message)
 +    {
 +        return () -> new AssertionError(message);
 +    }
 +
 +    public static class UnfilteredSource extends 
AbstractUnfilteredRowIterator implements UnfilteredRowIterator
 +    {
 +        Iterator<Unfiltered> content;
 +
 +        public UnfilteredSource(CFMetaData cfm, DecoratedKey partitionKey, 
Row staticRow, Iterator<Unfiltered> content)
 +        {
 +            super(cfm,
 +                  partitionKey,
 +                  DeletionTime.LIVE,
 +                  cfm.partitionColumns(),
 +                  staticRow != null ? staticRow : Rows.EMPTY_STATIC_ROW,
 +                  false,
 +                  EncodingStats.NO_STATS);
 +            this.content = content;
 +        }
 +
 +        @Override
 +        protected Unfiltered computeNext()
 +        {
 +            return content.hasNext() ? content.next() : endOfData();
 +        }
 +    }
 +
 +    public static UnfilteredPartitionIterator 
executeLocally(PartitionRangeReadCommand command,
 +                                                             
ColumnFamilyStore cfs,
 +                                                             ReadOrderGroup 
orderGroup)
 +    {
 +        return command.queryStorage(cfs, orderGroup);
 +    }
 +
 +    public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs)
 +    {
 +        try
 +        {
 +            for ( ; ; )
 +            {
 +                DataDirectory dir = 
cfs.getDirectories().getWriteableLocation(1);
-                 
BlacklistedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir));
++                
DisallowedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir));
 +            }
 +        }
 +        catch (IOError e)
 +        {
 +            // Expected -- marked all directories as unwritable
 +        }
-         return () -> BlacklistedDirectories.clearUnwritableUnsafe();
++        return () -> DisallowedDirectories.clearUnwritableUnsafe();
 +    }
  }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
index 6378e09,5208401..9276737
--- 
a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
@@@ -29,34 -29,26 +29,34 @@@ import org.junit.AfterClass
  import org.junit.BeforeClass;
  import org.junit.Test;
  
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertNotNull;
 +
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.config.*;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.LongType;
  import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.locator.SimpleStrategy;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.schema.*;
  
 -import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertTrue;
 -import static org.apache.cassandra.Util.cellname;
  
- public class BlacklistingCompactionsTest
+ public class CorruptedSSTablesCompactionsTest
  {
-     private static final Logger logger = 
LoggerFactory.getLogger(BlacklistingCompactionsTest.class);
++    private static final Logger logger = 
LoggerFactory.getLogger(CorruptedSSTablesCompactionsTest.class);
 +
 +    private static Random random;
 +
-     private static final String KEYSPACE1 = "BlacklistingCompactionsTest";
+     private static final String KEYSPACE1 = 
"CorruptedSSTablesCompactionsTest";
 -    private static final String CF_STANDARD1 = "Standard1";
 -    // seed hardcoded to one we know works:
 -    private static final Random random = new Random(1);
 +    private static final String STANDARD_STCS = "Standard_STCS";
 +    private static final String STANDARD_LCS = "Standard_LCS";
 +    private static int maxValueSize;
  
      @After
      public void leakDetect() throws InterruptedException
@@@ -112,18 -81,18 +112,19 @@@
      }
  
      @Test
-     public void testBlacklistingWithSizeTieredCompactionStrategy() throws 
Exception
+     public void testCorruptedSSTablesWithSizeTieredCompactionStrategy() 
throws Exception
      {
-         testBlacklisting(STANDARD_STCS);
 -        
testCorruptedSSTables(SizeTieredCompactionStrategy.class.getCanonicalName());
++        testCorruptedSSTables(STANDARD_STCS);
      }
  
      @Test
-     public void testBlacklistingWithLeveledCompactionStrategy() throws 
Exception
+     public void testCorruptedSSTablesWithLeveledCompactionStrategy() throws 
Exception
      {
-         testBlacklisting(STANDARD_LCS);
 -        
testCorruptedSSTables(LeveledCompactionStrategy.class.getCanonicalName());
++        testCorruptedSSTables(STANDARD_LCS);
      }
  
-     private void testBlacklisting(String tableName) throws Exception
 -    public void testCorruptedSSTables(String compactionStrategy) throws 
Exception
++
++    public void testCorruptedSSTables(String tableName) throws Exception
      {
          // this test does enough rows to force multiple block indexes to be 
used
          Keyspace keyspace = Keyspace.open(KEYSPACE1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to