This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit df4180f5782ce7530dc30448a17a4f4dbe4fae44 Merge: 5d930cc eda5db2 Author: Marcus Eriksson <[email protected]> AuthorDate: Fri Nov 29 09:53:37 2019 +0100 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Directories.java | 50 ++++--- .../org/apache/cassandra/db/DiskBoundaries.java | 11 +- .../apache/cassandra/db/DiskBoundaryManager.java | 6 +- .../cassandra/db/compaction/CompactionManager.java | 4 +- .../compaction/writers/CompactionAwareWriter.java | 15 ++- .../org/apache/cassandra/io/util/FileUtils.java | 4 +- .../org/apache/cassandra/db/DirectoriesTest.java | 149 +++++++++++++++++++++ .../compaction/CompactionStrategyManagerTest.java | 2 +- .../apache/cassandra/io/util/FileUtilsTest.java | 11 ++ 10 files changed, 214 insertions(+), 39 deletions(-) diff --cc CHANGES.txt index 918eb1b,cdebc03..3ede6a4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,7 -1,5 +1,8 @@@ -3.0.20 +3.11.6 + * Fix SELECT JSON formatting for the "duration" type (CASSANDRA-15075) + * Fix LegacyLayout to have same behavior as 2.x when handling unknown column names (CASSANDRA-15081) +Merged from 3.0: + * Fix various data directory prefix matching issues (CASSANDRA-13974) * Minimize clustering values in metadata collector (CASSANDRA-15400) * Avoid over-trimming of results in mixed mode clusters (CASSANDRA-15405) * validate value sizes in LegacyLayout (CASSANDRA-15373) diff --cc src/java/org/apache/cassandra/db/Directories.java index 532bf98,b104509..4731b3e --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@@ -21,11 -23,17 +21,12 @@@ import java.io.File import java.io.FileFilter; import java.io.IOError; import java.io.IOException; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; + import java.nio.file.Paths; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.util.*; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; -import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; @@@ -177,6 -185,6 +178,7 @@@ public class Directorie private final CFMetaData metadata; private final DataDirectory[] paths; private final File[] dataPaths; ++ private final ImmutableMap<Path, DataDirectory> canonicalPathToDD; public Directories(final CFMetaData metadata) { @@@ -199,6 -201,6 +201,8 @@@ this.metadata = metadata; this.paths = paths; ++ ImmutableMap.Builder<Path, DataDirectory> canonicalPathsBuilder = ImmutableMap.builder(); ++ String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId)); int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR); String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : metadata.cfName; @@@ -210,27 -212,27 +214,33 @@@ for (int i = 0; i < paths.length; ++i) { // check if old SSTable directory exists -- dataPaths[i] = new File(paths[i].location, oldSSTableRelativePath); ++ File dataPath = new File(paths[i].location, oldSSTableRelativePath); ++ dataPaths[i] = dataPath; ++ canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), paths[i]); } -- boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>() -- { -- public boolean apply(File file) -- { -- return file.exists(); -- } -- }); ++ boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), File::exists); if (!olderDirectoryExists) { ++ canonicalPathsBuilder = ImmutableMap.builder(); // use 2.1+ style String newSSTableRelativePath = join(metadata.ksName, cfName + '-' + cfId); for (int i = 0; i < paths.length; ++i) -- dataPaths[i] = new File(paths[i].location, newSSTableRelativePath); ++ { ++ File dataPath = new File(paths[i].location, newSSTableRelativePath);; ++ dataPaths[i] = dataPath; ++ canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), paths[i]); ++ } } // if index, then move to its own directory if (indexNameWithDot != null) { ++ canonicalPathsBuilder = ImmutableMap.builder(); for (int i = 0; i < paths.length; ++i) -- dataPaths[i] = new File(dataPaths[i], indexNameWithDot); ++ { ++ File dataPath = new File(dataPaths[i], indexNameWithDot); ++ dataPaths[i] = dataPath; ++ canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), paths[i]); ++ } } for (File dir : dataPaths) @@@ -274,6 -276,6 +284,7 @@@ } } } ++ canonicalPathToDD = canonicalPathsBuilder.build(); } /** @@@ -291,19 -298,6 +307,13 @@@ return null; } - public DataDirectory getDataDirectoryForFile(File directory) ++ public DataDirectory getDataDirectoryForFile(Descriptor descriptor) + { - if (directory != null) - { - for (DataDirectory dataDirectory : paths) - { - if (directory.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath())) - return dataDirectory; - } - } ++ if (descriptor != null) ++ return canonicalPathToDD.get(descriptor.directory.toPath()); + return null; + } + public Descriptor find(String filename) { for (File dir : dataPaths) diff --cc src/java/org/apache/cassandra/db/DiskBoundaries.java index 7bfed28,0000000..90af893 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaries.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java @@@ -1,131 -1,0 +1,134 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.Collections; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; + +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.service.StorageService; + +public class DiskBoundaries +{ + public final List<Directories.DataDirectory> directories; + public final ImmutableList<PartitionPosition> positions; + final long ringVersion; + final int directoriesVersion; ++ private final ColumnFamilyStore cfs; + private volatile boolean isInvalid = false; + - public DiskBoundaries(Directories.DataDirectory[] directories, int diskVersion) ++ public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories, int diskVersion) + { - this(directories, null, -1, diskVersion); ++ this(cfs, directories, null, -1, diskVersion); + } + + @VisibleForTesting - public DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion) ++ public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion) + { + this.directories = directories == null ? null : ImmutableList.copyOf(directories); + this.positions = positions == null ? null : ImmutableList.copyOf(positions); + this.ringVersion = ringVersion; + this.directoriesVersion = diskVersion; ++ this.cfs = cfs; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DiskBoundaries that = (DiskBoundaries) o; + + if (ringVersion != that.ringVersion) return false; + if (directoriesVersion != that.directoriesVersion) return false; + if (!directories.equals(that.directories)) return false; + return positions != null ? positions.equals(that.positions) : that.positions == null; + } + + public int hashCode() + { + int result = directories != null ? directories.hashCode() : 0; + result = 31 * result + (positions != null ? positions.hashCode() : 0); + result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32)); + result = 31 * result + directoriesVersion; + return result; + } + + public String toString() + { + return "DiskBoundaries{" + + "directories=" + directories + + ", positions=" + positions + + ", ringVersion=" + ringVersion + + ", directoriesVersion=" + directoriesVersion + + '}'; + } + + /** + * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion + */ + public boolean isOutOfDate() + { + if (isInvalid) + return true; + int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion(); + long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion(); + return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion); + } + + public void invalidate() + { + this.isInvalid = true; + } + + public int getDiskIndex(SSTableReader sstable) + { + if (positions == null) + { + return getBoundariesFromSSTableDirectory(sstable); + } + + int pos = Collections.binarySearch(positions, sstable.first); + assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal + return -pos - 1; + } + + /** + * Try to figure out location based on sstable directory + */ + private int getBoundariesFromSSTableDirectory(SSTableReader sstable) + { ++ Directories.DataDirectory actualDirectory = cfs.getDirectories().getDataDirectoryForFile(sstable.descriptor); + for (int i = 0; i < directories.size(); i++) + { + Directories.DataDirectory directory = directories.get(i); - if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath())) ++ if (actualDirectory != null && actualDirectory.equals(directory)) + return i; + } + return 0; + } + + public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable) + { + return directories.get(getDiskIndex(sstable)); + } +} diff --cc src/java/org/apache/cassandra/db/DiskBoundaryManager.java index 03cbf7b,0000000..61febe9 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@@ -1,140 -1,0 +1,140 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Splitter; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +public class DiskBoundaryManager +{ + private static final Logger logger = LoggerFactory.getLogger(DiskBoundaryManager.class); + private volatile DiskBoundaries diskBoundaries; + + public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs) + { + if (!cfs.getPartitioner().splitter().isPresent()) - return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion()); ++ return new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion()); + if (diskBoundaries == null || diskBoundaries.isOutOfDate()) + { + synchronized (this) + { + if (diskBoundaries == null || diskBoundaries.isOutOfDate()) + { + logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName()); + DiskBoundaries oldBoundaries = diskBoundaries; + diskBoundaries = getDiskBoundaryValue(cfs); + logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName()); + } + } + } + return diskBoundaries; + } + + public void invalidate() + { + if (diskBoundaries != null) + diskBoundaries.invalidate(); + } + + private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs) + { + Collection<Range<Token>> localRanges; + + long ringVersion; + TokenMetadata tmd; + do + { + tmd = StorageService.instance.getTokenMetadata(); + ringVersion = tmd.getRingVersion(); + if (StorageService.instance.isBootstrapMode() + && !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally + { + PendingRangeCalculatorService.instance.blockUntilFinished(); + localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress()); + } + else + { + // Reason we use use the future settled TMD is that if we decommission a node, we want to stream + // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. + // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled + localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress()); + } + logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion); + } + while (ringVersion != tmd.getRingVersion()); // if ringVersion is different here it means that + // it might have changed before we calculated localRanges - recalculate + + int directoriesVersion; + Directories.DataDirectory[] dirs; + do + { + directoriesVersion = BlacklistedDirectories.getDirectoriesVersion(); + dirs = cfs.getDirectories().getWriteableLocations(); + } + while (directoriesVersion != BlacklistedDirectories.getDirectoriesVersion()); // if directoriesVersion has changed we need to recalculate + + if (localRanges == null || localRanges.isEmpty()) - return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion); ++ return new DiskBoundaries(cfs, dirs, null, ringVersion, directoriesVersion); + + List<Range<Token>> sortedLocalRanges = Range.sort(localRanges); + + List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs); - return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion); ++ return new DiskBoundaries(cfs, dirs, positions, ringVersion, directoriesVersion); + } + + /** + * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not. + * + * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to + * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk + * etc. + * + * The final entry in the returned list will always be the partitioner maximum tokens upper key bound + */ + private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) + { + assert partitioner.splitter().isPresent(); + Splitter splitter = partitioner.splitter().get(); + boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1; + List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges); + // If we can't split by ranges, split evenly to ensure utilisation of all disks + if (dontSplitRanges && boundaries.size() < dataDirectories.length) + boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false); + + List<PartitionPosition> diskBoundaries = new ArrayList<>(); + for (int i = 0; i < boundaries.size() - 1; i++) + diskBoundaries.add(boundaries.get(i).maxKeyBound()); + diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound()); + return diskBoundaries; + } +} diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index a08d08b,2b9ee50..56d2d29 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -503,115 -499,6 +503,115 @@@ public class CompactionManager implemen }, jobs, OperationType.CLEANUP); } + public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException + { + assert !cfStore.isIndex(); + + return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) + { + Iterable<SSTableReader> originals = transaction.originals(); + if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones()) + originals = Iterables.filter(originals, SSTableReader::isRepaired); + List<SSTableReader> sortedSSTables = Lists.newArrayList(originals); + Collections.sort(sortedSSTables, SSTableReader.maxTimestampAscending); + return sortedSSTables; + } + + @Override + public void execute(LifecycleTransaction txn) throws IOException + { + logger.debug("Garbage collecting {}", txn.originals()); + CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())) + { + @Override + protected CompactionController getCompactionController(Set<SSTableReader> toCompact) + { + return new CompactionController(cfStore, toCompact, gcBefore, null, tombstoneOption); + } + }; + task.setUserDefined(true); + task.setCompactionType(OperationType.GARBAGE_COLLECT); + task.execute(metrics); + } + }, jobs, OperationType.GARBAGE_COLLECT); + } + + public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException + { + if (!cfs.getPartitioner().splitter().isPresent()) + { + logger.info("Partitioner does not support splitting"); + return AllSSTableOpStatus.ABORTED; + } + final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName()); + + if (r.isEmpty()) + { + logger.info("Relocate cannot run before a node has joined the ring"); + return AllSSTableOpStatus.ABORTED; + } + + final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries(); + + return parallelAllSSTableOperation(cfs, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) + { + Set<SSTableReader> originals = Sets.newHashSet(transaction.originals()); + Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet()); + transaction.cancel(Sets.difference(originals, needsRelocation)); + + Map<Integer, List<SSTableReader>> groupedByDisk = groupByDiskIndex(needsRelocation); + + int maxSize = 0; + for (List<SSTableReader> diskSSTables : groupedByDisk.values()) + maxSize = Math.max(maxSize, diskSSTables.size()); + + List<SSTableReader> mixedSSTables = new ArrayList<>(); + + for (int i = 0; i < maxSize; i++) + for (List<SSTableReader> diskSSTables : groupedByDisk.values()) + if (i < diskSSTables.size()) + mixedSSTables.add(diskSSTables.get(i)); + + return mixedSSTables; + } + + public Map<Integer, List<SSTableReader>> groupByDiskIndex(Set<SSTableReader> needsRelocation) + { + return needsRelocation.stream().collect(Collectors.groupingBy((s) -> diskBoundaries.getDiskIndex(s))); + } + + private boolean inCorrectLocation(SSTableReader sstable) + { + if (!cfs.getPartitioner().splitter().isPresent()) + return true; + + int diskIndex = diskBoundaries.getDiskIndex(sstable); - File diskLocation = diskBoundaries.directories.get(diskIndex).location; + PartitionPosition diskLast = diskBoundaries.positions.get(diskIndex); + + // the location we get from directoryIndex is based on the first key in the sstable + // now we need to make sure the last key is less than the boundary as well: - return sstable.descriptor.directory.getAbsolutePath().startsWith(diskLocation.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0; ++ Directories.DataDirectory dataDirectory = cfs.getDirectories().getDataDirectoryForFile(sstable.descriptor); ++ return diskBoundaries.directories.get(diskIndex).equals(dataDirectory) && sstable.last.compareTo(diskLast) <= 0; + } + + @Override + public void execute(LifecycleTransaction txn) + { + logger.debug("Relocating {}", txn.originals()); + AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); + task.setUserDefined(true); + task.setCompactionType(OperationType.RELOCATE); + task.execute(metrics); + } + }, jobs, OperationType.RELOCATE); + } + /** * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables * as repaired. diff --cc src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index d2f816b,d33d72c..bc6115e --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@@ -34,11 -27,9 +34,12 @@@ import org.apache.cassandra.db.Partitio import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; ++import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Transactional; +import org.apache.cassandra.db.compaction.OperationType; /** @@@ -197,45 -147,13 +198,45 @@@ public abstract class CompactionAwareWr /** * Return a directory where we can expect expectedWriteSize to fit. + * + * @param sstables the sstables to compact + * @return */ - public Directories.DataDirectory getWriteDirectory(long expectedWriteSize) + public Directories.DataDirectory getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize) { - File directory = null; - Directories.DataDirectory directory = getDirectories().getWriteableLocation(expectedWriteSize); - if (directory == null) - throw new RuntimeException("Insufficient disk space to write " + expectedWriteSize + " bytes"); ++ Descriptor descriptor = null; + for (SSTableReader sstable : sstables) + { - if (directory == null) - directory = sstable.descriptor.directory; - if (!directory.equals(sstable.descriptor.directory)) ++ if (descriptor == null) ++ descriptor = sstable.descriptor; ++ if (!descriptor.directory.equals(sstable.descriptor.directory)) + { - logger.trace("All sstables not from the same disk - putting results in {}", directory); ++ logger.trace("All sstables not from the same disk - putting results in {}", descriptor.directory); + break; + } + } - Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(directory); ++ Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(descriptor); + if (d != null) + { + long availableSpace = d.getAvailableSpace(); + if (availableSpace < estimatedWriteSize) + throw new RuntimeException(String.format("Not enough space to write %s to %s (%s available)", + FBUtilities.prettyPrintMemory(estimatedWriteSize), + d.location, + FBUtilities.prettyPrintMemory(availableSpace))); - logger.trace("putting compaction results in {}", directory); ++ logger.trace("putting compaction results in {}", descriptor.directory); + return d; + } + d = getDirectories().getWriteableLocation(estimatedWriteSize); + if (d == null) + throw new RuntimeException(String.format("Not enough disk space to store %s", + FBUtilities.prettyPrintMemory(estimatedWriteSize))); + return d; + } - return directory; + public CompactionAwareWriter setRepairedAt(long repairedAt) + { + this.sstableWriter.setRepairedAt(repairedAt); + return this; } } diff --cc test/unit/org/apache/cassandra/db/DirectoriesTest.java index 7e800e6,e4268f6..24e03f6 --- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java +++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java @@@ -25,6 -28,6 +28,7 @@@ import java.util.concurrent.Callable import java.util.concurrent.Executors; import java.util.concurrent.Future; ++import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.junit.AfterClass; import org.junit.BeforeClass; @@@ -471,6 -470,46 +475,150 @@@ public class DirectoriesTes } } + @Test + public void testGetLocationForDisk() + { - DataDirectory [] paths = new DataDirectory[3]; - paths[0] = new DataDirectory(new File("/tmp/aaa")); - paths[1] = new DataDirectory(new File("/tmp/aa")); - paths[2] = new DataDirectory(new File("/tmp/a")); ++ Collection<DataDirectory> paths = new ArrayList<>(); ++ paths.add(new DataDirectory(new File("/tmp/aaa"))); ++ paths.add(new DataDirectory(new File("/tmp/aa"))); ++ paths.add(new DataDirectory(new File("/tmp/a"))); + + for (CFMetaData cfm : CFM) + { + Directories dirs = new Directories(cfm, paths); + for (DataDirectory dir : paths) + { + String p = dirs.getLocationForDisk(dir).getAbsolutePath() + File.separator; + assertTrue(p.startsWith(dir.location.getAbsolutePath() + File.separator)); + } + } + } + + @Test + public void testGetLocationWithSymlinks() throws IOException + { + Path p = Files.createTempDirectory("something"); + Path symlinktarget = Files.createDirectories(p.resolve("symlinktarget")); + Path ddir = Files.createDirectories(p.resolve("datadir1")); + + Path p1 = Files.createDirectories(ddir.resolve("p1").resolve("ks")).getParent(); // the data dir does not include the keyspace dir + Path p2 = Files.createDirectories(ddir.resolve("p2")); + Path l1 = Files.createSymbolicLink(p2.resolve("ks"), symlinktarget); + + DataDirectory path1 = new DataDirectory(p1.toFile()); + DataDirectory path2 = new DataDirectory(p2.toFile()); + Directories dirs = new Directories(CFM.iterator().next(), new DataDirectory[] {path1, path2}); + dirs.getLocationForDisk(new DataDirectory(p1.toFile())); + dirs.getLocationForDisk(new DataDirectory(p2.toFile())); + + assertTrue(dirs.getLocationForDisk(path2).toPath().startsWith(l1)); + assertTrue(dirs.getLocationForDisk(path1).toPath().startsWith(p1)); + } + ++ @Test ++ public void getDataDirectoryForFile() ++ { ++ Collection<DataDirectory> paths = new ArrayList<>(); ++ paths.add(new DataDirectory(new File("/tmp/a"))); ++ paths.add(new DataDirectory(new File("/tmp/aa"))); ++ paths.add(new DataDirectory(new File("/tmp/aaa"))); ++ ++ for (CFMetaData cfm : CFM) ++ { ++ Directories dirs = new Directories(cfm, paths); ++ for (DataDirectory dir : paths) ++ { ++ Descriptor d = Descriptor.fromFilename(new File(dir.location, getNewFilename(cfm, false)).toString()); ++ String p = dirs.getDataDirectoryForFile(d).location.getAbsolutePath() + File.separator; ++ assertTrue(p.startsWith(dir.location.getAbsolutePath() + File.separator)); ++ } ++ } ++ } ++ ++ /** ++ * Makes sure we can find the data directory when it is a symlink ++ * ++ * creates the following data directories: ++ * <tempdir something>/datadir1 ++ * <tempdir something>/datadir11 (symlink to <tempdir something>/symlinktarget) ++ * ++ * and then makes sure that we get the correct directory back. ++ */ ++ @Test ++ public void testDirectoriesSymlinks() throws IOException ++ { ++ Path p = Files.createTempDirectory("something"); ++ Path symlinktarget = Files.createDirectories(p.resolve("symlinktarget")); ++ Path ddir1 = Files.createDirectories(p.resolve("datadir1")); ++ Path ddir2 = Files.createSymbolicLink(p.resolve("datadir11"), symlinktarget); ++ DataDirectory dd1 = new DataDirectory(ddir1.toFile()); ++ DataDirectory dd2 = new DataDirectory(ddir2.toFile()); ++ ++ for (CFMetaData tm : CFM) ++ { ++ Directories dirs = new Directories(tm, Sets.newHashSet(dd1, dd2)); ++ Descriptor desc = Descriptor.fromFilename(ddir1.resolve(getNewFilename(tm, false)).toString()); ++ assertEquals(ddir1.toFile(), dirs.getDataDirectoryForFile(desc).location); ++ desc = Descriptor.fromFilename(ddir2.resolve(getNewFilename(tm, false)).toString()); ++ assertEquals(ddir2.toFile(), dirs.getDataDirectoryForFile(desc).location); ++ } ++ } ++ ++ @Test ++ public void testDirectoriesOldTableSymlink() throws IOException ++ { ++ testDirectoriesSymlinksHelper(true); ++ } ++ ++ @Test ++ public void testDirectoriesTableSymlink() throws IOException ++ { ++ testDirectoriesSymlinksHelper(false); ++ } ++ ++ /** ++ * Makes sure we can find the data directory for a file when the table directory is a symlink ++ * ++ * if oldStyle is false we append the table id to the table directory ++ * ++ * creates the following structure ++ * <tempdir>/datadir1/<ks>/<table> ++ * <tempdir>/datadir11/<ks>/<table symlink to <tempdir>/symlinktarget> ++ * ++ * and then we create a fake descriptor to a file in the table directory and make sure we get the correct ++ * data directory back. ++ */ ++ private void testDirectoriesSymlinksHelper(boolean oldStyle) throws IOException ++ { ++ Path p = Files.createTempDirectory("something"); ++ Path symlinktarget = Files.createDirectories(p.resolve("symlinktarget")); ++ Path ddir1 = Files.createDirectories(p.resolve("datadir1")); ++ Path ddir2 = Files.createDirectories(p.resolve("datadir11")); ++ ++ for (CFMetaData metadata : CFM) ++ { ++ Path keyspacedir = Files.createDirectories(ddir2.resolve(metadata.ksName)); ++ String tabledir = metadata.cfName + (oldStyle ? "" : Component.separator + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId))); ++ Files.createSymbolicLink(keyspacedir.resolve(tabledir), symlinktarget); ++ } ++ ++ DataDirectory dd1 = new DataDirectory(ddir1.toFile()); ++ DataDirectory dd2 = new DataDirectory(ddir2.toFile()); ++ for (CFMetaData tm : CFM) ++ { ++ Directories dirs = new Directories(tm, Sets.newHashSet(dd1, dd2)); ++ Descriptor desc = Descriptor.fromFilename(ddir1.resolve(getNewFilename(tm, oldStyle)).toFile().toString()); ++ assertEquals(ddir1.toFile(), dirs.getDataDirectoryForFile(desc).location); ++ desc = Descriptor.fromFilename(ddir2.resolve(getNewFilename(tm, oldStyle)).toFile().toString()); ++ assertEquals(ddir2.toFile(), dirs.getDataDirectoryForFile(desc).location); ++ } ++ } ++ ++ private String getNewFilename(CFMetaData metadata, boolean oldStyle) ++ { ++ return metadata.ksName + File.separator + metadata.cfName + (oldStyle ? "" : Component.separator + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId))) + "/na-1-big-Data.db"; ++ } ++ private List<Directories.DataDirectoryCandidate> getWriteableDirectories(DataDirectory[] dataDirectories, long writeSize) { // copied from Directories.getWriteableLocation(long) @@@ -492,4 -531,4 +640,5 @@@ return candidates; } ++ } diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java index c654fcd,0000000..2120757 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java @@@ -1,290 -1,0 +1,290 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.io.Files; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.DiskBoundaries; +import org.apache.cassandra.db.DiskBoundaryManager; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.notifications.SSTableAddedNotification; +import org.apache.cassandra.notifications.SSTableDeletingNotification; +import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.StorageService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CompactionStrategyManagerTest +{ + private static final String KS_PREFIX = "Keyspace1"; + private static final String TABLE_PREFIX = "CF_STANDARD"; + + private static IPartitioner originalPartitioner; + private static boolean backups; + + @BeforeClass + public static void beforeClass() + { + SchemaLoader.prepareServer(); + backups = DatabaseDescriptor.isIncrementalBackupsEnabled(); + DatabaseDescriptor.setIncrementalBackupsEnabled(false); + /** + * We use byte ordered partitioner in this test to be able to easily infer an SSTable + * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)} + */ + originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance); + } + + @AfterClass + public static void afterClass() + { + DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner); + DatabaseDescriptor.setIncrementalBackupsEnabled(backups); + } + + @Test + public void testSSTablesAssignedToCorrectCompactionStrategy() + { + // Creates 100 SSTables with keys 0-99 + int numSSTables = 100; + SchemaLoader.createKeyspace(KS_PREFIX, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX) + .compaction(CompactionParams.scts(Collections.emptyMap()))); + ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); + cfs.disableAutoCompaction(); + for (int i = 0; i < numSSTables; i++) + { + createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i); + } + + // Creates a CompactionStrategymanager with different numbers of disks and check + // if the SSTables are assigned to the correct compaction strategies + for (int numDisks = 2; numDisks < 10; numDisks++) + { + testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks); + } + } + + public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks) + { + // Create a mock CFS with the given number of disks + MockCFS cfs = createJBODMockCFS(numDisks); + //Check that CFS will contain numSSTables + assertEquals(numSSTables, cfs.getLiveSSTables().size()); + + // Creates a compaction strategy manager with an external boundary supplier + final Integer[] boundaries = computeBoundaries(numSSTables, numDisks); + + MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries); + System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries)); + CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries, + true); + + // Check that SSTables are assigned to the correct Compaction Strategy + for (SSTableReader reader : cfs.getLiveSSTables()) + { + verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader); + } + + for (int delta = 1; delta <= 3; delta++) + { + // Update disk boundaries + Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length); + updateBoundaries(mockBoundaryManager, boundaries, delta); + + // Check that SSTables are still assigned to the previous boundary layout + System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries)); + for (SSTableReader reader : cfs.getLiveSSTables()) + { + verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader); + } + + // Reload CompactionStrategyManager so new disk boundaries will be loaded + csm.maybeReloadDiskBoundaries(); + + for (SSTableReader reader : cfs.getLiveSSTables()) + { + // Check that SSTables are assigned to the new boundary layout + verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader); + + // Remove SSTable and check that it will be removed from the correct compaction strategy + csm.handleNotification(new SSTableDeletingNotification(reader), this); + assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader)); + + // Add SSTable again and check that is correctly assigned + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this); + verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader); + } + } + } + + private MockCFS createJBODMockCFS(int disks) + { + // Create #disks data directories to simulate JBOD + Directories.DataDirectory[] directories = new Directories.DataDirectory[disks]; + for (int i = 0; i < disks; ++i) + { + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + directories[i] = new Directories.DataDirectory(tempDir); + } + + ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); + MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories)); + mockCFS.disableAutoCompaction(); + mockCFS.addSSTables(cfs.getLiveSSTables()); + return mockCFS; + } + + /** + * Updates the boundaries with a delta + */ + private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta) + { + for (int j = 0; j < boundaries.length - 1; j++) + { + if ((j + delta) % 2 == 0) + boundaries[j] -= delta; + else + boundaries[j] += delta; + } + boundaryManager.invalidateBoundaries(); + } + + private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader) + { + // Check that sstable is assigned to correct disk + int index = getSSTableIndex(boundaries, reader); + assertEquals(index, csm.compactionStrategyIndexFor(reader)); + // Check that compaction strategy actually contains SSTable + assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader)); + } + + /** + * Creates disk boundaries such that each disk receives + * an equal amount of SSTables + */ + private Integer[] computeBoundaries(int numSSTables, int numDisks) + { + Integer[] result = new Integer[numDisks]; + int sstablesPerRange = numSSTables / numDisks; + result[0] = sstablesPerRange; + for (int i = 1; i < numDisks; i++) + { + result[i] = result[i - 1] + sstablesPerRange; + } + result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors + return result; + } + + /** + * Since each SSTable contains keys from 0-99, and each sstable + * generation is numbered from 1-100, since we are using ByteOrderedPartitioner + * we can compute the sstable position in the disk boundaries by finding + * the generation position relative to the boundaries + */ + private int getSSTableIndex(Integer[] boundaries, SSTableReader reader) + { + int index = 0; + while (boundaries[index] < reader.descriptor.generation) + index++; + System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index); + return index; + } + + + + class MockBoundaryManager + { + private final ColumnFamilyStore cfs; + private Integer[] positions; + private DiskBoundaries boundaries; + + public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions) + { + this.cfs = cfs; + this.positions = positions; + this.boundaries = createDiskBoundaries(cfs, positions); + } + + public void invalidateBoundaries() + { + boundaries.invalidate(); + } + + public DiskBoundaries getBoundaries() + { + if (boundaries.isOutOfDate()) + boundaries = createDiskBoundaries(cfs, positions); + return boundaries; + } + + private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries) + { + List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList()); - return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0); ++ return new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(), positions, 0, 0); + } + } + + private static void createSSTableWithKey(String keyspace, String table, int key) + { + long timestamp = System.currentTimeMillis(); + DecoratedKey dk = Util.dk(String.format("%04d", key)); + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey()) + .clustering(Integer.toString(key)) + .add("val", "val") + .build() + .applyUnsafe(); + cfs.forceBlockingFlush(); + } + + // just to be able to override the data directories + private static class MockCFS extends ColumnFamilyStore + { + MockCFS(ColumnFamilyStore cfs, Directories dirs) + { + super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true); + } + } +} diff --cc test/unit/org/apache/cassandra/io/util/FileUtilsTest.java index 0e7d8c8,8d1b752..2f9ccd4 --- a/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java +++ b/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java @@@ -20,19 -20,13 +20,20 @@@ package org.apache.cassandra.io.util import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.charset.Charset; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.config.DatabaseDescriptor; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class FileUtilsTest @@@ -66,45 -54,12 +67,55 @@@ } @Test + public void testFolderSize() throws Exception + { + File folder = createFolder(Paths.get(DatabaseDescriptor.getAllDataFileLocations()[0], "testFolderSize")); + folder.deleteOnExit(); + + File childFolder = createFolder(Paths.get(folder.getPath(), "child")); + + File[] files = { + createFile(new File(folder, "001"), 10000), + createFile(new File(folder, "002"), 1000), + createFile(new File(folder, "003"), 100), + createFile(new File(childFolder, "001"), 1000), + createFile(new File(childFolder, "002"), 2000), + }; + + assertEquals(0, FileUtils.folderSize(new File(folder, "i_dont_exist"))); + assertEquals(files[0].length(), FileUtils.folderSize(files[0])); + + long size = FileUtils.folderSize(folder); + assertEquals(Arrays.stream(files).mapToLong(f -> f.length()).sum(), size); + } + ++ @Test + public void testIsContained() + { + assertTrue(FileUtils.isContained(new File("/tmp/abc"), new File("/tmp/abc"))); + assertFalse(FileUtils.isContained(new File("/tmp/abc"), new File("/tmp/abcd"))); + assertTrue(FileUtils.isContained(new File("/tmp/abc"), new File("/tmp/abc/d"))); + assertTrue(FileUtils.isContained(new File("/tmp/abc/../abc"), new File("/tmp/abc/d"))); + assertFalse(FileUtils.isContained(new File("/tmp/abc/../abc"), new File("/tmp/abcc"))); + } ++ + private File createFolder(Path path) + { + File folder = path.toFile(); + FileUtils.createDirectory(folder); + return folder; + } + + private File createFile(File file, long size) + { + try (RandomAccessFile f = new RandomAccessFile(file, "rw")) + { + f.setLength(size); + } + catch (Exception e) + { + System.err.println(e); + } + return file; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
