This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit df4180f5782ce7530dc30448a17a4f4dbe4fae44
Merge: 5d930cc eda5db2
Author: Marcus Eriksson <[email protected]>
AuthorDate: Fri Nov 29 09:53:37 2019 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/Directories.java  |  50 ++++---
 .../org/apache/cassandra/db/DiskBoundaries.java    |  11 +-
 .../apache/cassandra/db/DiskBoundaryManager.java   |   6 +-
 .../cassandra/db/compaction/CompactionManager.java |   4 +-
 .../compaction/writers/CompactionAwareWriter.java  |  15 ++-
 .../org/apache/cassandra/io/util/FileUtils.java    |   4 +-
 .../org/apache/cassandra/db/DirectoriesTest.java   | 149 +++++++++++++++++++++
 .../compaction/CompactionStrategyManagerTest.java  |   2 +-
 .../apache/cassandra/io/util/FileUtilsTest.java    |  11 ++
 10 files changed, 214 insertions(+), 39 deletions(-)

diff --cc CHANGES.txt
index 918eb1b,cdebc03..3ede6a4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.20
 +3.11.6
 + * Fix SELECT JSON formatting for the "duration" type (CASSANDRA-15075)
 + * Fix LegacyLayout to have same behavior as 2.x when handling unknown column 
names (CASSANDRA-15081)
 +Merged from 3.0:
+  * Fix various data directory prefix matching issues (CASSANDRA-13974)
   * Minimize clustering values in metadata collector (CASSANDRA-15400)
   * Avoid over-trimming of results in mixed mode clusters (CASSANDRA-15405)
   * validate value sizes in LegacyLayout (CASSANDRA-15373)
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 532bf98,b104509..4731b3e
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -21,11 -23,17 +21,12 @@@ import java.io.File
  import java.io.FileFilter;
  import java.io.IOError;
  import java.io.IOException;
 -import java.nio.file.FileVisitResult;
  import java.nio.file.Files;
  import java.nio.file.Path;
+ import java.nio.file.Paths;
 -import java.nio.file.SimpleFileVisitor;
 -import java.nio.file.attribute.BasicFileAttributes;
  import java.util.*;
  import java.util.concurrent.ThreadLocalRandom;
 -import java.util.concurrent.atomic.AtomicLong;
  import java.util.function.BiFunction;
 -import java.util.function.Consumer;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Predicate;
@@@ -177,6 -185,6 +178,7 @@@ public class Directorie
      private final CFMetaData metadata;
      private final DataDirectory[] paths;
      private final File[] dataPaths;
++    private final ImmutableMap<Path, DataDirectory> canonicalPathToDD;
  
      public Directories(final CFMetaData metadata)
      {
@@@ -199,6 -201,6 +201,8 @@@
          this.metadata = metadata;
          this.paths = paths;
  
++        ImmutableMap.Builder<Path, DataDirectory> canonicalPathsBuilder = 
ImmutableMap.builder();
++
          String cfId = 
ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
          int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
          String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : 
metadata.cfName;
@@@ -210,27 -212,27 +214,33 @@@
          for (int i = 0; i < paths.length; ++i)
          {
              // check if old SSTable directory exists
--            dataPaths[i] = new File(paths[i].location, 
oldSSTableRelativePath);
++            File dataPath = new File(paths[i].location, 
oldSSTableRelativePath);
++            dataPaths[i] = dataPath;
++            
canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), 
paths[i]);
          }
--        boolean olderDirectoryExists = 
Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
--        {
--            public boolean apply(File file)
--            {
--                return file.exists();
--            }
--        });
++        boolean olderDirectoryExists = 
Iterables.any(Arrays.asList(dataPaths), File::exists);
          if (!olderDirectoryExists)
          {
++            canonicalPathsBuilder = ImmutableMap.builder();
              // use 2.1+ style
              String newSSTableRelativePath = join(metadata.ksName, cfName + 
'-' + cfId);
              for (int i = 0; i < paths.length; ++i)
--                dataPaths[i] = new File(paths[i].location, 
newSSTableRelativePath);
++            {
++                File dataPath = new File(paths[i].location, 
newSSTableRelativePath);;
++                dataPaths[i] = dataPath;
++                
canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), 
paths[i]);
++            }
          }
          // if index, then move to its own directory
          if (indexNameWithDot != null)
          {
++            canonicalPathsBuilder = ImmutableMap.builder();
              for (int i = 0; i < paths.length; ++i)
--                dataPaths[i] = new File(dataPaths[i], indexNameWithDot);
++            {
++                File dataPath = new File(dataPaths[i], indexNameWithDot);
++                dataPaths[i] = dataPath;
++                
canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), 
paths[i]);
++            }
          }
  
          for (File dir : dataPaths)
@@@ -274,6 -276,6 +284,7 @@@
                  }
              }
          }
++        canonicalPathToDD = canonicalPathsBuilder.build();
      }
  
      /**
@@@ -291,19 -298,6 +307,13 @@@
          return null;
      }
  
-     public DataDirectory getDataDirectoryForFile(File directory)
++    public DataDirectory getDataDirectoryForFile(Descriptor descriptor)
 +    {
-         if (directory != null)
-         {
-             for (DataDirectory dataDirectory : paths)
-             {
-                 if 
(directory.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
-                     return dataDirectory;
-             }
-         }
++        if (descriptor != null)
++            return canonicalPathToDD.get(descriptor.directory.toPath());
 +        return null;
 +    }
 +
      public Descriptor find(String filename)
      {
          for (File dir : dataPaths)
diff --cc src/java/org/apache/cassandra/db/DiskBoundaries.java
index 7bfed28,0000000..90af893
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@@ -1,131 -1,0 +1,134 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.util.Collections;
 +import java.util.List;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableList;
 +
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.service.StorageService;
 +
 +public class DiskBoundaries
 +{
 +    public final List<Directories.DataDirectory> directories;
 +    public final ImmutableList<PartitionPosition> positions;
 +    final long ringVersion;
 +    final int directoriesVersion;
++    private final ColumnFamilyStore cfs;
 +    private volatile boolean isInvalid = false;
 +
-     public DiskBoundaries(Directories.DataDirectory[] directories, int 
diskVersion)
++    public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] 
directories, int diskVersion)
 +    {
-         this(directories, null, -1, diskVersion);
++        this(cfs, directories, null, -1, diskVersion);
 +    }
 +
 +    @VisibleForTesting
-     public DiskBoundaries(Directories.DataDirectory[] directories, 
List<PartitionPosition> positions, long ringVersion, int diskVersion)
++    public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] 
directories, List<PartitionPosition> positions, long ringVersion, int 
diskVersion)
 +    {
 +        this.directories = directories == null ? null : 
ImmutableList.copyOf(directories);
 +        this.positions = positions == null ? null : 
ImmutableList.copyOf(positions);
 +        this.ringVersion = ringVersion;
 +        this.directoriesVersion = diskVersion;
++        this.cfs = cfs;
 +    }
 +
 +    public boolean equals(Object o)
 +    {
 +        if (this == o) return true;
 +        if (o == null || getClass() != o.getClass()) return false;
 +
 +        DiskBoundaries that = (DiskBoundaries) o;
 +
 +        if (ringVersion != that.ringVersion) return false;
 +        if (directoriesVersion != that.directoriesVersion) return false;
 +        if (!directories.equals(that.directories)) return false;
 +        return positions != null ? positions.equals(that.positions) : 
that.positions == null;
 +    }
 +
 +    public int hashCode()
 +    {
 +        int result = directories != null ? directories.hashCode() : 0;
 +        result = 31 * result + (positions != null ? positions.hashCode() : 0);
 +        result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32));
 +        result = 31 * result + directoriesVersion;
 +        return result;
 +    }
 +
 +    public String toString()
 +    {
 +        return "DiskBoundaries{" +
 +               "directories=" + directories +
 +               ", positions=" + positions +
 +               ", ringVersion=" + ringVersion +
 +               ", directoriesVersion=" + directoriesVersion +
 +               '}';
 +    }
 +
 +    /**
 +     * check if the given disk boundaries are out of date due not being set 
or to having too old diskVersion/ringVersion
 +     */
 +    public boolean isOutOfDate()
 +    {
 +        if (isInvalid)
 +            return true;
 +        int currentDiskVersion = 
BlacklistedDirectories.getDirectoriesVersion();
 +        long currentRingVersion = 
StorageService.instance.getTokenMetadata().getRingVersion();
 +        return currentDiskVersion != directoriesVersion || (ringVersion != -1 
&& currentRingVersion != ringVersion);
 +    }
 +
 +    public void invalidate()
 +    {
 +        this.isInvalid = true;
 +    }
 +
 +    public int getDiskIndex(SSTableReader sstable)
 +    {
 +        if (positions == null)
 +        {
 +            return getBoundariesFromSSTableDirectory(sstable);
 +        }
 +
 +        int pos = Collections.binarySearch(positions, sstable.first);
 +        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so 
they should never be equal
 +        return -pos - 1;
 +    }
 +
 +    /**
 +     * Try to figure out location based on sstable directory
 +     */
 +    private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
 +    {
++        Directories.DataDirectory actualDirectory = 
cfs.getDirectories().getDataDirectoryForFile(sstable.descriptor);
 +        for (int i = 0; i < directories.size(); i++)
 +        {
 +            Directories.DataDirectory directory = directories.get(i);
-             if 
(sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
++            if (actualDirectory != null && actualDirectory.equals(directory))
 +                return i;
 +        }
 +        return 0;
 +    }
 +
 +    public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader 
sstable)
 +    {
 +        return directories.get(getDiskIndex(sstable));
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 03cbf7b,0000000..61febe9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@@ -1,140 -1,0 +1,140 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Splitter;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.locator.TokenMetadata;
 +import org.apache.cassandra.service.PendingRangeCalculatorService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +public class DiskBoundaryManager
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(DiskBoundaryManager.class);
 +    private volatile DiskBoundaries diskBoundaries;
 +
 +    public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
 +    {
 +        if (!cfs.getPartitioner().splitter().isPresent())
-             return new 
DiskBoundaries(cfs.getDirectories().getWriteableLocations(), 
BlacklistedDirectories.getDirectoriesVersion());
++            return new DiskBoundaries(cfs, 
cfs.getDirectories().getWriteableLocations(), 
BlacklistedDirectories.getDirectoriesVersion());
 +        if (diskBoundaries == null || diskBoundaries.isOutOfDate())
 +        {
 +            synchronized (this)
 +            {
 +                if (diskBoundaries == null || diskBoundaries.isOutOfDate())
 +                {
 +                    logger.debug("Refreshing disk boundary cache for {}.{}", 
cfs.keyspace.getName(), cfs.getTableName());
 +                    DiskBoundaries oldBoundaries = diskBoundaries;
 +                    diskBoundaries = getDiskBoundaryValue(cfs);
 +                    logger.debug("Updating boundaries from {} to {} for 
{}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), 
cfs.getTableName());
 +                }
 +            }
 +        }
 +        return diskBoundaries;
 +    }
 +
 +    public void invalidate()
 +    {
 +       if (diskBoundaries != null)
 +           diskBoundaries.invalidate();
 +    }
 +
 +    private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
 +    {
 +        Collection<Range<Token>> localRanges;
 +
 +        long ringVersion;
 +        TokenMetadata tmd;
 +        do
 +        {
 +            tmd = StorageService.instance.getTokenMetadata();
 +            ringVersion = tmd.getRingVersion();
 +            if (StorageService.instance.isBootstrapMode()
 +                && !StorageService.isReplacingSameAddress()) // When 
replacing same address, the node marks itself as UN locally
 +            {
 +                PendingRangeCalculatorService.instance.blockUntilFinished();
 +                localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), 
FBUtilities.getBroadcastAddress());
 +            }
 +            else
 +            {
 +                // Reason we use use the future settled TMD is that if we 
decommission a node, we want to stream
 +                // from that node to the correct location on disk, if we 
didn't, we would put new files in the wrong places.
 +                // We do this to minimize the amount of data we need to move 
in rebalancedisks once everything settled
 +                localRanges = 
cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress());
 +            }
 +            logger.debug("Got local ranges {} (ringVersion = {})", 
localRanges, ringVersion);
 +        }
 +        while (ringVersion != tmd.getRingVersion()); // if ringVersion is 
different here it means that
 +                                                     // it might have changed 
before we calculated localRanges - recalculate
 +
 +        int directoriesVersion;
 +        Directories.DataDirectory[] dirs;
 +        do
 +        {
 +            directoriesVersion = 
BlacklistedDirectories.getDirectoriesVersion();
 +            dirs = cfs.getDirectories().getWriteableLocations();
 +        }
 +        while (directoriesVersion != 
BlacklistedDirectories.getDirectoriesVersion()); // if directoriesVersion has 
changed we need to recalculate
 +
 +        if (localRanges == null || localRanges.isEmpty())
-             return new DiskBoundaries(dirs, null, ringVersion, 
directoriesVersion);
++            return new DiskBoundaries(cfs, dirs, null, ringVersion, 
directoriesVersion);
 +
 +        List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
 +
 +        List<PartitionPosition> positions = 
getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
-         return new DiskBoundaries(dirs, positions, ringVersion, 
directoriesVersion);
++        return new DiskBoundaries(cfs, dirs, positions, ringVersion, 
directoriesVersion);
 +    }
 +
 +    /**
 +     * Returns a list of disk boundaries, the result will differ depending on 
whether vnodes are enabled or not.
 +     *
 +     * What is returned are upper bounds for the disks, meaning everything 
from partitioner.minToken up to
 +     * getDiskBoundaries(..).get(0) should be on the first disk, everything 
between 0 to 1 should be on the second disk
 +     * etc.
 +     *
 +     * The final entry in the returned list will always be the partitioner 
maximum tokens upper key bound
 +     */
 +    private static List<PartitionPosition> 
getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner 
partitioner, Directories.DataDirectory[] dataDirectories)
 +    {
 +        assert partitioner.splitter().isPresent();
 +        Splitter splitter = partitioner.splitter().get();
 +        boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
 +        List<Token> boundaries = 
splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, 
dontSplitRanges);
 +        // If we can't split by ranges, split evenly to ensure utilisation of 
all disks
 +        if (dontSplitRanges && boundaries.size() < dataDirectories.length)
 +            boundaries = splitter.splitOwnedRanges(dataDirectories.length, 
sortedLocalRanges, false);
 +
 +        List<PartitionPosition> diskBoundaries = new ArrayList<>();
 +        for (int i = 0; i < boundaries.size() - 1; i++)
 +            diskBoundaries.add(boundaries.get(i).maxKeyBound());
 +        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
 +        return diskBoundaries;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a08d08b,2b9ee50..56d2d29
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -503,115 -499,6 +503,115 @@@ public class CompactionManager implemen
          }, jobs, OperationType.CLEANUP);
      }
  
 +    public AllSSTableOpStatus performGarbageCollection(final 
ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws 
InterruptedException, ExecutionException
 +    {
 +        assert !cfStore.isIndex();
 +
 +        return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Iterable<SSTableReader> originals = transaction.originals();
 +                if 
(cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
 +                    originals = Iterables.filter(originals, 
SSTableReader::isRepaired);
 +                List<SSTableReader> sortedSSTables = 
Lists.newArrayList(originals);
 +                Collections.sort(sortedSSTables, 
SSTableReader.maxTimestampAscending);
 +                return sortedSSTables;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn) throws IOException
 +            {
 +                logger.debug("Garbage collecting {}", txn.originals());
 +                CompactionTask task = new CompactionTask(cfStore, txn, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
 +                {
 +                    @Override
 +                    protected CompactionController 
getCompactionController(Set<SSTableReader> toCompact)
 +                    {
 +                        return new CompactionController(cfStore, toCompact, 
gcBefore, null, tombstoneOption);
 +                    }
 +                };
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.GARBAGE_COLLECT);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.GARBAGE_COLLECT);
 +    }
 +
 +    public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, 
int jobs) throws ExecutionException, InterruptedException
 +    {
 +        if (!cfs.getPartitioner().splitter().isPresent())
 +        {
 +            logger.info("Partitioner does not support splitting");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +        final Collection<Range<Token>> r = 
StorageService.instance.getLocalRanges(cfs.keyspace.getName());
 +
 +        if (r.isEmpty())
 +        {
 +            logger.info("Relocate cannot run before a node has joined the 
ring");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +
 +        final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
 +
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Set<SSTableReader> originals = 
Sets.newHashSet(transaction.originals());
 +                Set<SSTableReader> needsRelocation = 
originals.stream().filter(s -> 
!inCorrectLocation(s)).collect(Collectors.toSet());
 +                transaction.cancel(Sets.difference(originals, 
needsRelocation));
 +
 +                Map<Integer, List<SSTableReader>> groupedByDisk = 
groupByDiskIndex(needsRelocation);
 +
 +                int maxSize = 0;
 +                for (List<SSTableReader> diskSSTables : 
groupedByDisk.values())
 +                    maxSize = Math.max(maxSize, diskSSTables.size());
 +
 +                List<SSTableReader> mixedSSTables = new ArrayList<>();
 +
 +                for (int i = 0; i < maxSize; i++)
 +                    for (List<SSTableReader> diskSSTables : 
groupedByDisk.values())
 +                        if (i < diskSSTables.size())
 +                            mixedSSTables.add(diskSSTables.get(i));
 +
 +                return mixedSSTables;
 +            }
 +
 +            public Map<Integer, List<SSTableReader>> 
groupByDiskIndex(Set<SSTableReader> needsRelocation)
 +            {
 +                return 
needsRelocation.stream().collect(Collectors.groupingBy((s) -> 
diskBoundaries.getDiskIndex(s)));
 +            }
 +
 +            private boolean inCorrectLocation(SSTableReader sstable)
 +            {
 +                if (!cfs.getPartitioner().splitter().isPresent())
 +                    return true;
 +
 +                int diskIndex = diskBoundaries.getDiskIndex(sstable);
-                 File diskLocation = 
diskBoundaries.directories.get(diskIndex).location;
 +                PartitionPosition diskLast = 
diskBoundaries.positions.get(diskIndex);
 +
 +                // the location we get from directoryIndex is based on the 
first key in the sstable
 +                // now we need to make sure the last key is less than the 
boundary as well:
-                 return 
sstable.descriptor.directory.getAbsolutePath().startsWith(diskLocation.getAbsolutePath())
 && sstable.last.compareTo(diskLast) <= 0;
++                Directories.DataDirectory dataDirectory = 
cfs.getDirectories().getDataDirectoryForFile(sstable.descriptor);
++                return 
diskBoundaries.directories.get(diskIndex).equals(dataDirectory) && 
sstable.last.compareTo(diskLast) <= 0;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn)
 +            {
 +                logger.debug("Relocating {}", txn.originals());
 +                AbstractCompactionTask task = 
cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, 
Long.MAX_VALUE);
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.RELOCATE);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.RELOCATE);
 +    }
 +
      /**
       * Submit anti-compactions for a collection of SSTables over a set of 
repaired ranges and marks corresponding SSTables
       * as repaired.
diff --cc 
src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index d2f816b,d33d72c..bc6115e
--- 
a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@@ -34,11 -27,9 +34,12 @@@ import org.apache.cassandra.db.Partitio
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.db.compaction.CompactionTask;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
++import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.SSTableRewriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.concurrent.Transactional;
 +import org.apache.cassandra.db.compaction.OperationType;
  
  
  /**
@@@ -197,45 -147,13 +198,45 @@@ public abstract class CompactionAwareWr
  
      /**
       * Return a directory where we can expect expectedWriteSize to fit.
 +     *
 +     * @param sstables the sstables to compact
 +     * @return
       */
 -    public Directories.DataDirectory getWriteDirectory(long expectedWriteSize)
 +    public Directories.DataDirectory 
getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize)
      {
-         File directory = null;
 -        Directories.DataDirectory directory = 
getDirectories().getWriteableLocation(expectedWriteSize);
 -        if (directory == null)
 -            throw new RuntimeException("Insufficient disk space to write " + 
expectedWriteSize + " bytes");
++        Descriptor descriptor = null;
 +        for (SSTableReader sstable : sstables)
 +        {
-             if (directory == null)
-                 directory = sstable.descriptor.directory;
-             if (!directory.equals(sstable.descriptor.directory))
++            if (descriptor == null)
++                descriptor = sstable.descriptor;
++            if (!descriptor.directory.equals(sstable.descriptor.directory))
 +            {
-                 logger.trace("All sstables not from the same disk - putting 
results in {}", directory);
++                logger.trace("All sstables not from the same disk - putting 
results in {}", descriptor.directory);
 +                break;
 +            }
 +        }
-         Directories.DataDirectory d = 
getDirectories().getDataDirectoryForFile(directory);
++        Directories.DataDirectory d = 
getDirectories().getDataDirectoryForFile(descriptor);
 +        if (d != null)
 +        {
 +            long availableSpace = d.getAvailableSpace();
 +            if (availableSpace < estimatedWriteSize)
 +                throw new RuntimeException(String.format("Not enough space to 
write %s to %s (%s available)",
 +                                                         
FBUtilities.prettyPrintMemory(estimatedWriteSize),
 +                                                         d.location,
 +                                                         
FBUtilities.prettyPrintMemory(availableSpace)));
-             logger.trace("putting compaction results in {}", directory);
++            logger.trace("putting compaction results in {}", 
descriptor.directory);
 +            return d;
 +        }
 +        d = getDirectories().getWriteableLocation(estimatedWriteSize);
 +        if (d == null)
 +            throw new RuntimeException(String.format("Not enough disk space 
to store %s",
 +                                                     
FBUtilities.prettyPrintMemory(estimatedWriteSize)));
 +        return d;
 +    }
  
 -        return directory;
 +    public CompactionAwareWriter setRepairedAt(long repairedAt)
 +    {
 +        this.sstableWriter.setRepairedAt(repairedAt);
 +        return this;
      }
  }
diff --cc test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 7e800e6,e4268f6..24e03f6
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@@ -25,6 -28,6 +28,7 @@@ import java.util.concurrent.Callable
  import java.util.concurrent.Executors;
  import java.util.concurrent.Future;
  
++import com.google.common.collect.Sets;
  import org.apache.commons.lang3.StringUtils;
  import org.junit.AfterClass;
  import org.junit.BeforeClass;
@@@ -471,6 -470,46 +475,150 @@@ public class DirectoriesTes
          }
      }
  
+     @Test
+     public void testGetLocationForDisk()
+     {
 -        DataDirectory [] paths = new DataDirectory[3];
 -        paths[0] = new DataDirectory(new File("/tmp/aaa"));
 -        paths[1] = new DataDirectory(new File("/tmp/aa"));
 -        paths[2] = new DataDirectory(new File("/tmp/a"));
++        Collection<DataDirectory> paths = new ArrayList<>();
++        paths.add(new DataDirectory(new File("/tmp/aaa")));
++        paths.add(new DataDirectory(new File("/tmp/aa")));
++        paths.add(new DataDirectory(new File("/tmp/a")));
+ 
+         for (CFMetaData cfm : CFM)
+         {
+             Directories dirs = new Directories(cfm, paths);
+             for (DataDirectory dir : paths)
+             {
+                 String p = dirs.getLocationForDisk(dir).getAbsolutePath() + 
File.separator;
+                 assertTrue(p.startsWith(dir.location.getAbsolutePath() + 
File.separator));
+             }
+         }
+     }
+ 
+     @Test
+     public void testGetLocationWithSymlinks() throws IOException
+     {
+         Path p = Files.createTempDirectory("something");
+         Path symlinktarget = 
Files.createDirectories(p.resolve("symlinktarget"));
+         Path ddir = Files.createDirectories(p.resolve("datadir1"));
+ 
+         Path p1 = 
Files.createDirectories(ddir.resolve("p1").resolve("ks")).getParent(); // the 
data dir does not include the keyspace dir
+         Path p2 = Files.createDirectories(ddir.resolve("p2"));
+         Path l1 = Files.createSymbolicLink(p2.resolve("ks"), symlinktarget);
+ 
+         DataDirectory path1 = new DataDirectory(p1.toFile());
+         DataDirectory path2 = new DataDirectory(p2.toFile());
+         Directories dirs = new Directories(CFM.iterator().next(), new 
DataDirectory[] {path1, path2});
+         dirs.getLocationForDisk(new DataDirectory(p1.toFile()));
+         dirs.getLocationForDisk(new DataDirectory(p2.toFile()));
+ 
+         assertTrue(dirs.getLocationForDisk(path2).toPath().startsWith(l1));
+         assertTrue(dirs.getLocationForDisk(path1).toPath().startsWith(p1));
+     }
+ 
++    @Test
++    public void getDataDirectoryForFile()
++    {
++        Collection<DataDirectory> paths = new ArrayList<>();
++        paths.add(new DataDirectory(new File("/tmp/a")));
++        paths.add(new DataDirectory(new File("/tmp/aa")));
++        paths.add(new DataDirectory(new File("/tmp/aaa")));
++
++        for (CFMetaData cfm : CFM)
++        {
++            Directories dirs = new Directories(cfm, paths);
++            for (DataDirectory dir : paths)
++            {
++                Descriptor d = Descriptor.fromFilename(new File(dir.location, 
getNewFilename(cfm, false)).toString());
++                String p = 
dirs.getDataDirectoryForFile(d).location.getAbsolutePath() + File.separator;
++                assertTrue(p.startsWith(dir.location.getAbsolutePath() + 
File.separator));
++            }
++        }
++    }
++
++    /**
++     * Makes sure we can find the data directory when it is a symlink
++     *
++     * creates the following data directories:
++     * <tempdir something>/datadir1
++     * <tempdir something>/datadir11 (symlink to <tempdir 
something>/symlinktarget)
++     *
++     * and then makes sure that we get the correct directory back.
++     */
++    @Test
++    public void testDirectoriesSymlinks() throws IOException
++    {
++        Path p = Files.createTempDirectory("something");
++        Path symlinktarget = 
Files.createDirectories(p.resolve("symlinktarget"));
++        Path ddir1 = Files.createDirectories(p.resolve("datadir1"));
++        Path ddir2 = Files.createSymbolicLink(p.resolve("datadir11"), 
symlinktarget);
++        DataDirectory dd1 = new DataDirectory(ddir1.toFile());
++        DataDirectory dd2 = new DataDirectory(ddir2.toFile());
++
++        for (CFMetaData tm : CFM)
++        {
++            Directories dirs = new Directories(tm, Sets.newHashSet(dd1, dd2));
++            Descriptor desc = 
Descriptor.fromFilename(ddir1.resolve(getNewFilename(tm, false)).toString());
++            assertEquals(ddir1.toFile(), 
dirs.getDataDirectoryForFile(desc).location);
++            desc = Descriptor.fromFilename(ddir2.resolve(getNewFilename(tm, 
false)).toString());
++            assertEquals(ddir2.toFile(), 
dirs.getDataDirectoryForFile(desc).location);
++        }
++    }
++
++    @Test
++    public void testDirectoriesOldTableSymlink() throws IOException
++    {
++        testDirectoriesSymlinksHelper(true);
++    }
++
++    @Test
++    public void testDirectoriesTableSymlink() throws IOException
++    {
++        testDirectoriesSymlinksHelper(false);
++    }
++
++    /**
++     * Makes sure we can find the data directory for a file when the table 
directory is a symlink
++     *
++     * if oldStyle is false we append the table id to the table directory
++     *
++     * creates the following structure
++     * <tempdir>/datadir1/<ks>/<table>
++     * <tempdir>/datadir11/<ks>/<table symlink to <tempdir>/symlinktarget>
++     *
++     * and then we create a fake descriptor to a file in the table directory 
and make sure we get the correct
++     * data directory back.
++     */
++    private void testDirectoriesSymlinksHelper(boolean oldStyle) throws 
IOException
++    {
++        Path p = Files.createTempDirectory("something");
++        Path symlinktarget = 
Files.createDirectories(p.resolve("symlinktarget"));
++        Path ddir1 = Files.createDirectories(p.resolve("datadir1"));
++        Path ddir2 = Files.createDirectories(p.resolve("datadir11"));
++
++        for (CFMetaData metadata : CFM)
++        {
++            Path keyspacedir = 
Files.createDirectories(ddir2.resolve(metadata.ksName));
++            String tabledir = metadata.cfName + (oldStyle ? "" : 
Component.separator +  
ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId)));
++            Files.createSymbolicLink(keyspacedir.resolve(tabledir), 
symlinktarget);
++        }
++
++        DataDirectory dd1 = new DataDirectory(ddir1.toFile());
++        DataDirectory dd2 = new DataDirectory(ddir2.toFile());
++        for (CFMetaData tm : CFM)
++        {
++            Directories dirs = new Directories(tm, Sets.newHashSet(dd1, dd2));
++            Descriptor desc = 
Descriptor.fromFilename(ddir1.resolve(getNewFilename(tm, 
oldStyle)).toFile().toString());
++            assertEquals(ddir1.toFile(), 
dirs.getDataDirectoryForFile(desc).location);
++            desc = Descriptor.fromFilename(ddir2.resolve(getNewFilename(tm, 
oldStyle)).toFile().toString());
++            assertEquals(ddir2.toFile(), 
dirs.getDataDirectoryForFile(desc).location);
++        }
++    }
++
++    private String getNewFilename(CFMetaData metadata, boolean oldStyle)
++    {
++        return metadata.ksName + File.separator + metadata.cfName + (oldStyle 
? "" : Component.separator +  
ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId))) + 
"/na-1-big-Data.db";
++    }
++
      private List<Directories.DataDirectoryCandidate> 
getWriteableDirectories(DataDirectory[] dataDirectories, long writeSize)
      {
          // copied from Directories.getWriteableLocation(long)
@@@ -492,4 -531,4 +640,5 @@@
  
          return candidates;
      }
++
  }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index c654fcd,0000000..2120757
mode 100644,000000..100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@@ -1,290 -1,0 +1,290 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.compaction;
 +
 +import java.io.File;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.io.Files;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.DiskBoundaries;
 +import org.apache.cassandra.db.DiskBoundaryManager;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.RowUpdateBuilder;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.notifications.SSTableAddedNotification;
 +import org.apache.cassandra.notifications.SSTableDeletingNotification;
 +import org.apache.cassandra.schema.CompactionParams;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.service.StorageService;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +public class CompactionStrategyManagerTest
 +{
 +    private static final String KS_PREFIX = "Keyspace1";
 +    private static final String TABLE_PREFIX = "CF_STANDARD";
 +
 +    private static IPartitioner originalPartitioner;
 +    private static boolean backups;
 +
 +    @BeforeClass
 +    public static void beforeClass()
 +    {
 +        SchemaLoader.prepareServer();
 +        backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
 +        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
 +        /**
 +         * We use byte ordered partitioner in this test to be able to easily 
infer an SSTable
 +         * disk assignment based on its generation - See {@link 
this#getSSTableIndex(Integer[], SSTableReader)}
 +         */
 +        originalPartitioner = 
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
 +    }
 +
 +    @AfterClass
 +    public static void afterClass()
 +    {
 +        DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
 +        DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
 +    }
 +
 +    @Test
 +    public void testSSTablesAssignedToCorrectCompactionStrategy()
 +    {
 +        // Creates 100 SSTables with keys 0-99
 +        int numSSTables = 100;
 +        SchemaLoader.createKeyspace(KS_PREFIX,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KS_PREFIX, 
TABLE_PREFIX)
 +                                                
.compaction(CompactionParams.scts(Collections.emptyMap())));
 +        ColumnFamilyStore cfs = 
Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
 +        cfs.disableAutoCompaction();
 +        for (int i = 0; i < numSSTables; i++)
 +        {
 +            createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
 +        }
 +
 +        // Creates a CompactionStrategymanager with different numbers of 
disks and check
 +        // if the SSTables are assigned to the correct compaction strategies
 +        for (int numDisks = 2; numDisks < 10; numDisks++)
 +        {
 +            testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, 
numDisks);
 +        }
 +    }
 +
 +    public void testSSTablesAssignedToCorrectCompactionStrategy(int 
numSSTables, int numDisks)
 +    {
 +        // Create a mock CFS with the given number of disks
 +        MockCFS cfs = createJBODMockCFS(numDisks);
 +        //Check that CFS will contain numSSTables
 +        assertEquals(numSSTables, cfs.getLiveSSTables().size());
 +
 +        // Creates a compaction strategy manager with an external boundary 
supplier
 +        final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
 +
 +        MockBoundaryManager mockBoundaryManager = new 
MockBoundaryManager(cfs, boundaries);
 +        System.out.println("Boundaries for " + numDisks + " disks is " + 
Arrays.toString(boundaries));
 +        CompactionStrategyManager csm = new CompactionStrategyManager(cfs, 
mockBoundaryManager::getBoundaries,
 +                                                                      true);
 +
 +        // Check that SSTables are assigned to the correct Compaction Strategy
 +        for (SSTableReader reader : cfs.getLiveSSTables())
 +        {
 +            verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
 +        }
 +
 +        for (int delta = 1; delta <= 3; delta++)
 +        {
 +            // Update disk boundaries
 +            Integer[] previousBoundaries = Arrays.copyOf(boundaries, 
boundaries.length);
 +            updateBoundaries(mockBoundaryManager, boundaries, delta);
 +
 +            // Check that SSTables are still assigned to the previous 
boundary layout
 +            System.out.println("Old boundaries: " + 
Arrays.toString(previousBoundaries) + " New boundaries: " + 
Arrays.toString(boundaries));
 +            for (SSTableReader reader : cfs.getLiveSSTables())
 +            {
 +                verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, 
csm, reader);
 +            }
 +
 +            // Reload CompactionStrategyManager so new disk boundaries will 
be loaded
 +            csm.maybeReloadDiskBoundaries();
 +
 +            for (SSTableReader reader : cfs.getLiveSSTables())
 +            {
 +                // Check that SSTables are assigned to the new boundary layout
 +                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, 
reader);
 +
 +                // Remove SSTable and check that it will be removed from the 
correct compaction strategy
 +                csm.handleNotification(new 
SSTableDeletingNotification(reader), this);
 +                
assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
 +
 +                // Add SSTable again and check that is correctly assigned
 +                csm.handleNotification(new 
SSTableAddedNotification(Collections.singleton(reader)), this);
 +                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, 
reader);
 +            }
 +        }
 +    }
 +
 +    private MockCFS createJBODMockCFS(int disks)
 +    {
 +        // Create #disks data directories to simulate JBOD
 +        Directories.DataDirectory[] directories = new 
Directories.DataDirectory[disks];
 +        for (int i = 0; i < disks; ++i)
 +        {
 +            File tempDir = Files.createTempDir();
 +            tempDir.deleteOnExit();
 +            directories[i] = new Directories.DataDirectory(tempDir);
 +        }
 +
 +        ColumnFamilyStore cfs = 
Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
 +        MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, 
directories));
 +        mockCFS.disableAutoCompaction();
 +        mockCFS.addSSTables(cfs.getLiveSSTables());
 +        return mockCFS;
 +    }
 +
 +    /**
 +     * Updates the boundaries with a delta
 +     */
 +    private void updateBoundaries(MockBoundaryManager boundaryManager, 
Integer[] boundaries, int delta)
 +    {
 +        for (int j = 0; j < boundaries.length - 1; j++)
 +        {
 +            if ((j + delta) % 2 == 0)
 +                boundaries[j] -= delta;
 +            else
 +                boundaries[j] += delta;
 +        }
 +        boundaryManager.invalidateBoundaries();
 +    }
 +
 +    private void verifySSTableIsAssignedToCorrectStrategy(Integer[] 
boundaries, CompactionStrategyManager csm, SSTableReader reader)
 +    {
 +        // Check that sstable is assigned to correct disk
 +        int index = getSSTableIndex(boundaries, reader);
 +        assertEquals(index, csm.compactionStrategyIndexFor(reader));
 +        // Check that compaction strategy actually contains SSTable
 +        
assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
 +    }
 +
 +    /**
 +     * Creates disk boundaries such that each disk receives
 +     * an equal amount of SSTables
 +     */
 +    private Integer[] computeBoundaries(int numSSTables, int numDisks)
 +    {
 +        Integer[] result = new Integer[numDisks];
 +        int sstablesPerRange = numSSTables / numDisks;
 +        result[0] = sstablesPerRange;
 +        for (int i = 1; i < numDisks; i++)
 +        {
 +            result[i] = result[i - 1] + sstablesPerRange;
 +        }
 +        result[numDisks - 1] = numSSTables; // make last boundary alwyays be 
the number of SSTables to prevent rounding errors
 +        return result;
 +    }
 +
 +    /**
 +     * Since each SSTable contains keys from 0-99, and each sstable
 +     * generation is numbered from 1-100, since we are using 
ByteOrderedPartitioner
 +     * we can compute the sstable position in the disk boundaries by finding
 +     * the generation position relative to the boundaries
 +     */
 +    private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
 +    {
 +        int index = 0;
 +        while (boundaries[index] < reader.descriptor.generation)
 +            index++;
 +        System.out.println("Index for SSTable " + 
reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + 
" is " + index);
 +        return index;
 +    }
 +
 +
 +
 +    class MockBoundaryManager
 +    {
 +        private final ColumnFamilyStore cfs;
 +        private Integer[] positions;
 +        private DiskBoundaries boundaries;
 +
 +        public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
 +        {
 +            this.cfs = cfs;
 +            this.positions = positions;
 +            this.boundaries = createDiskBoundaries(cfs, positions);
 +        }
 +
 +        public void invalidateBoundaries()
 +        {
 +            boundaries.invalidate();
 +        }
 +
 +        public DiskBoundaries getBoundaries()
 +        {
 +            if (boundaries.isOutOfDate())
 +                boundaries = createDiskBoundaries(cfs, positions);
 +            return boundaries;
 +        }
 +
 +        private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, 
Integer[] boundaries)
 +        {
 +            List<PartitionPosition> positions = 
Arrays.stream(boundaries).map(b -> 
Util.token(String.format(String.format("%04d", 
b))).minKeyBound()).collect(Collectors.toList());
-             return new 
DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
++            return new DiskBoundaries(cfs, 
cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
 +        }
 +    }
 +
 +    private static void createSSTableWithKey(String keyspace, String table, 
int key)
 +    {
 +        long timestamp = System.currentTimeMillis();
 +        DecoratedKey dk = Util.dk(String.format("%04d", key));
 +        ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
 +        new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
 +        .clustering(Integer.toString(key))
 +        .add("val", "val")
 +        .build()
 +        .applyUnsafe();
 +        cfs.forceBlockingFlush();
 +    }
 +
 +    // just to be able to override the data directories
 +    private static class MockCFS extends ColumnFamilyStore
 +    {
 +        MockCFS(ColumnFamilyStore cfs, Directories dirs)
 +        {
 +            super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, 
false, false, true);
 +        }
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
index 0e7d8c8,8d1b752..2f9ccd4
--- a/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
+++ b/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
@@@ -20,19 -20,13 +20,20 @@@ package org.apache.cassandra.io.util
  
  import java.io.File;
  import java.io.IOException;
 +import java.io.RandomAccessFile;
  import java.nio.charset.Charset;
  import java.nio.file.Files;
 +import java.nio.file.Path;
 +import java.nio.file.Paths;
 +import java.util.Arrays;
  
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +
  import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
  
  public class FileUtilsTest
@@@ -66,45 -54,12 +67,55 @@@
      }
  
      @Test
 +    public void testFolderSize() throws Exception
 +    {
 +        File folder = 
createFolder(Paths.get(DatabaseDescriptor.getAllDataFileLocations()[0], 
"testFolderSize"));
 +        folder.deleteOnExit();
 +
 +        File childFolder = createFolder(Paths.get(folder.getPath(), "child"));
 +
 +        File[] files = {
 +                       createFile(new File(folder, "001"), 10000),
 +                       createFile(new File(folder, "002"), 1000),
 +                       createFile(new File(folder, "003"), 100),
 +                       createFile(new File(childFolder, "001"), 1000),
 +                       createFile(new File(childFolder, "002"), 2000),
 +        };
 +
 +        assertEquals(0, FileUtils.folderSize(new File(folder, 
"i_dont_exist")));
 +        assertEquals(files[0].length(), FileUtils.folderSize(files[0]));
 +
 +        long size = FileUtils.folderSize(folder);
 +        assertEquals(Arrays.stream(files).mapToLong(f -> f.length()).sum(), 
size);
 +    }
 +
++    @Test
+     public void testIsContained()
+     {
+         assertTrue(FileUtils.isContained(new File("/tmp/abc"), new 
File("/tmp/abc")));
+         assertFalse(FileUtils.isContained(new File("/tmp/abc"), new 
File("/tmp/abcd")));
+         assertTrue(FileUtils.isContained(new File("/tmp/abc"), new 
File("/tmp/abc/d")));
+         assertTrue(FileUtils.isContained(new File("/tmp/abc/../abc"), new 
File("/tmp/abc/d")));
+         assertFalse(FileUtils.isContained(new File("/tmp/abc/../abc"), new 
File("/tmp/abcc")));
+     }
++
 +    private File createFolder(Path path)
 +    {
 +        File folder = path.toFile();
 +        FileUtils.createDirectory(folder);
 +        return folder;
 +    }
 +
 +    private File createFile(File file, long size)
 +    {
 +        try (RandomAccessFile f = new RandomAccessFile(file, "rw"))
 +        {
 +            f.setLength(size);
 +        }
 +        catch (Exception e)
 +        {
 +            System.err.println(e);
 +        }
 +        return file;
 +    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to