Repository: cassandra Updated Branches: refs/heads/trunk 2fcd29b83 -> 127cfff26
Add ability to load new SSTables from a separate directory Patch by marcuse; reviewed by Jordan West for CASSANDRA-6719 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/127cfff2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/127cfff2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/127cfff2 Branch: refs/heads/trunk Commit: 127cfff26bbed97d2ae9cb3b0a65824d130ec78b Parents: 2fcd29b Author: Marcus Eriksson <[email protected]> Authored: Fri Feb 16 15:46:58 2018 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Wed Apr 18 14:59:00 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 + .../apache/cassandra/db/ColumnFamilyStore.java | 184 +++++++-- .../cassandra/db/ColumnFamilyStoreMBean.java | 16 +- .../org/apache/cassandra/db/Directories.java | 51 ++- .../cassandra/service/StorageService.java | 10 +- .../cassandra/service/StorageServiceMBean.java | 11 +- .../org/apache/cassandra/tools/NodeProbe.java | 6 + .../org/apache/cassandra/tools/NodeTool.java | 1 + .../apache/cassandra/tools/nodetool/Import.java | 94 +++++ .../cassandra/tools/nodetool/Refresh.java | 1 + .../org/apache/cassandra/db/ImportTest.java | 402 +++++++++++++++++++ .../org/apache/cassandra/db/VerifyTest.java | 2 +- 13 files changed, 746 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 76321dd..6c9e30a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add ability to load new SSTables from a separate directory (CASSANDRA-6719) * Eliminate background repair and probablistic read_repair_chance table options (CASSANDRA-13910) * Bind to correct local address in 4.0 streaming (CASSANDRA-14362) http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index e8e88a4..9216bc0 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -38,6 +38,8 @@ using the provided 'sstableupgrade' tool. New features ------------ + - `nodetool refresh` has been deprecated in favour of `nodetool import` - see CASSANDRA-6719 + for details - An experimental option to compare all merkle trees together has been added - for example, in a 3 node cluster with 2 replicas identical and 1 out-of-date, with this option enabled, the out-of-date replica will only stream a single copy from up-to-date replica. Enable it by adding http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 50db418..2bde9a8 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -74,11 +74,13 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.metrics.TableMetrics.Sampler; import org.apache.cassandra.repair.TableRepairManager; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.TableStreamManager; @@ -682,31 +684,154 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } /** - * See #{@code StorageService.loadNewSSTables(String, String)} for more info + * See #{@code StorageService.importNewSSTables} for more info * * @param ksName The keyspace name * @param cfName The columnFamily name */ - public static void loadNewSSTables(String ksName, String cfName) + public static void loadNewSSTables(String ksName, String cfName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck) { /** ks/cf existence checks will be done by open and getCFS methods for us */ Keyspace keyspace = Keyspace.open(ksName); - keyspace.getColumnFamilyStore(cfName).loadNewSSTables(); + keyspace.getColumnFamilyStore(cfName).loadNewSSTables(srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck); + } + + + @Deprecated + public synchronized void loadNewSSTables() + { + loadNewSSTables(null, true, false, false, false, false, false); + } + + /** + * Iterates over all keys in the sstable index and invalidates the row cache + * + * also counts the number of tokens that should be on each disk in JBOD-config to minimize the amount of data compaction + * needs to move around + */ + @VisibleForTesting + static File findBestDiskAndInvalidateCaches(ColumnFamilyStore cfs, Descriptor desc, String srcPath, boolean clearCaches, boolean jbodCheck) throws IOException + { + int boundaryIndex = 0; + DiskBoundaries boundaries = cfs.getDiskBoundaries(); + boolean shouldCountKeys = boundaries.positions != null && jbodCheck; + if (!cfs.isRowCacheEnabled() || !clearCaches) + { + if (srcPath == null) // user has dropped the sstables in the data directory, use it directly + return desc.directory; + if (boundaries.directories != null && boundaries.directories.size() == 1) // only a single data directory, use it without counting keys + return cfs.directories.getLocationForDisk(boundaries.directories.get(0)); + if (!shouldCountKeys) // for non-random partitioners positions can be null, get the directory with the most space available + return cfs.directories.getWriteableLocationToLoadFile(new File(desc.baseFilename())); + } + + long count = 0; + int maxIndex = 0; + long maxCount = 0; + try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(desc.filenameFor(Component.PRIMARY_INDEX)))) + { + long indexSize = primaryIndex.length(); + while (primaryIndex.getFilePointer() != indexSize) + { + ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); + RowIndexEntry.Serializer.skip(primaryIndex, desc.version); + DecoratedKey decoratedKey = cfs.metadata().partitioner.decorateKey(key); + if (clearCaches) + cfs.invalidateCachedPartition(decoratedKey); + if (shouldCountKeys) + { + while (boundaries.positions.get(boundaryIndex).compareTo(decoratedKey) < 0) + { + logger.debug("{} has {} keys in {}", desc, count, boundaries.positions.get(boundaryIndex)); + if (count > maxCount) + { + maxIndex = boundaryIndex; + maxCount = count; + } + boundaryIndex++; + count = 0; + } + count++; + } + } + if (shouldCountKeys) + { + if (count > maxCount) + maxIndex = boundaryIndex; + logger.debug("{} has {} keys in {}", desc, count, boundaries.positions.get(boundaryIndex)); + } + } + File dir; + if (srcPath == null) + dir = desc.directory; + else if (shouldCountKeys) + dir = cfs.directories.getLocationForDisk(boundaries.directories.get(maxIndex)); + else + dir = cfs.directories.getWriteableLocationToLoadFile(new File(desc.baseFilename())); + return dir; } /** * #{@inheritDoc} */ - public synchronized void loadNewSSTables() + public synchronized void loadNewSSTables(String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck) { - logger.info("Loading new SSTables for {}/{}...", keyspace.getName(), name); + logger.info("Loading new SSTables for {}/{} from {}... (resetLevel = {}, clearRepaired = {}, verifySSTables = {}, verifyTokens = {}, invalidateCaches = {}, jbodCheck = {})", + keyspace.getName(), name, srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck); + + File dir = null; + if (srcPath != null && !srcPath.isEmpty()) + { + dir = new File(srcPath); + if (!dir.exists()) + { + throw new RuntimeException(String.format("Directory %s does not exist", srcPath)); + } + if (!Directories.verifyFullPermissions(dir, srcPath)) + { + throw new RuntimeException("Insufficient permissions on directory " + srcPath); + } + } Set<Descriptor> currentDescriptors = new HashSet<>(); for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) currentDescriptors.add(sstable.descriptor); Set<SSTableReader> newSSTables = new HashSet<>(); + Directories.SSTableLister lister = dir == null ? + directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true) : + directories.sstableLister(dir, Directories.OnTxnErr.IGNORE).skipTemporary(true); + + // verify first to avoid starting to copy sstables to the data directories and then have to abort. + if (verifySSTables || verifyTokens) + { + for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) + { + Descriptor descriptor = entry.getKey(); + SSTableReader reader = null; + try + { + reader = SSTableReader.open(descriptor, entry.getValue(), metadata); + Verifier.Options verifierOptions = Verifier.options().extendedVerification(verifyTokens) + .checkOwnsTokens(verifyTokens) + .invokeDiskFailurePolicy(false) + .mutateRepairStatus(false).build(); + try (Verifier verifier = new Verifier(this, reader, false, verifierOptions)) + { + verifier.verify(); + } + } + catch (Throwable t) + { + throw new RuntimeException("Can't import sstable "+descriptor, t); + } + finally + { + if (reader != null) + reader.selfRef().release(); + } + } + } - Directories.SSTableLister lister = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) { Descriptor descriptor = entry.getKey(); @@ -719,28 +844,42 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean descriptor.getFormat().getLatestVersion(), descriptor)); - // force foreign sstables to level 0 + File targetDirectory; try { if (new File(descriptor.filenameFor(Component.STATS)).exists()) - descriptor.getMetadataSerializer().mutateLevel(descriptor, 0); + { + if (resetLevel) + { + descriptor.getMetadataSerializer().mutateLevel(descriptor, 0); + } + if (clearRepaired) + { + descriptor.getMetadataSerializer().mutateRepaired(descriptor, + ActiveRepairService.UNREPAIRED_SSTABLE, + null); + } + } + targetDirectory = findBestDiskAndInvalidateCaches(this, descriptor, srcPath, invalidateCaches, jbodCheck); + logger.debug("{} will get copied to {}", descriptor, targetDirectory); } catch (IOException e) { - FileUtils.handleCorruptSSTable(new CorruptSSTableException(e, entry.getKey().filenameFor(Component.STATS))); - logger.error("Cannot read sstable {}; other IO error, skipping table", entry, e); - continue; + logger.error("{} is corrupt, can't import", descriptor, e); + throw new RuntimeException(e); } - // Increment the generation until we find a filename that doesn't exist. This is needed because the new - // SSTables that are being loaded might already use these generation numbers. Descriptor newDescriptor; do { newDescriptor = new Descriptor(descriptor.version, - descriptor.directory, + // If source dir is not provided, then we are just loading from data directory, so use same data directory otherwise + // get the most suitable location to load into + targetDirectory, descriptor.ksname, descriptor.cfname, + // Increment the generation until we find a filename that doesn't exist. This is needed because the new + // SSTables that are being loaded might already use these generation numbers. fileIndexGenerator.incrementAndGet(), descriptor.formatType); } @@ -754,17 +893,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata); } - catch (CorruptSSTableException ex) - { - FileUtils.handleCorruptSSTable(ex); - logger.error("Corrupt sstable {}; skipping table", entry, ex); - continue; - } - catch (FSError ex) + catch (Throwable t) { - FileUtils.handleFSError(ex); - logger.error("Cannot read sstable {}; file system error, skipping table", entry, ex); - continue; + for (SSTableReader sstable : newSSTables) + sstable.selfRef().release(); + // log which sstables we have copied so far, so that the operator can remove them + if (srcPath != null) + logger.error("Aborting import of sstables. {} copied, {} was corrupt", newSSTables, newDescriptor); + throw new RuntimeException(newDescriptor+" is corrupt, can't import", t); } newSSTables.add(reader); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index bdb842d..7f416bf 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -142,12 +142,22 @@ public interface ColumnFamilyStoreMBean */ public List<String> getSSTablesForKey(String key, boolean hexFormat); + /** - * Scan through Keyspace/ColumnFamily's data directory - * determine which SSTables should be loaded and load them + * Load new sstables from the given directory + * + * @param srcPath the path to the new sstables - if it is null, the data directories will be scanned + * @param resetLevel if the level should be reset to 0 on the new sstables + * @param clearRepaired if repaired info should be wiped from the new sstables + * @param verifySSTables if the new sstables should be verified that they are not corrupt + * @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node + * @param invalidateCaches if row cache should be invalidated for the keys in the new sstables + * @param jbodCheck if the new sstables should be placed 'optimally' - count tokens and move the sstable to the directory where it has the most keys */ - public void loadNewSSTables(); + public void loadNewSSTables(String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck); + @Deprecated + public void loadNewSSTables(); /** * @return the number of SSTables in L0. Always return 0 if Leveled compaction is not enabled. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 1fc071a..afec1a4 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileFilter; import java.io.IOError; import java.io.IOException; +import java.nio.file.FileStore; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; @@ -333,6 +334,41 @@ public class Directories } /** + * Returns a data directory to load the file {@code sourceFile}. If the sourceFile is on same disk partition as any + * data directory then use that one as data directory otherwise use {@link #getWriteableLocationAsFile(long)} to + * find suitable data directory. + * + * Also makes sure returned directory is non-blacklisted. + * + * @throws FSWriteError if all directories are blacklisted + */ + public File getWriteableLocationToLoadFile(final File sourceFile) + { + try + { + final FileStore srcFileStore = Files.getFileStore(sourceFile.toPath()); + for (final File dataPath : dataPaths) + { + if (BlacklistedDirectories.isUnwritable(dataPath)) + { + continue; + } + + if (Files.getFileStore(dataPath.toPath()).equals(srcFileStore)) + { + return dataPath; + } + } + } + catch (final IOException e) + { + // pass exceptions in finding filestore. This is best effort anyway. Fall back on getWriteableLocationAsFile() + } + + return getWriteableLocationAsFile(sourceFile.length()); + } + + /** * Returns a temporary subdirectory on non-blacklisted data directory * that _currently_ has {@code writeSize} bytes as usable space. * This method does not create the temporary directory. @@ -642,10 +678,15 @@ public class Directories public SSTableLister sstableLister(OnTxnErr onTxnErr) { - return new SSTableLister(onTxnErr); + return new SSTableLister(this.dataPaths, this.metadata, onTxnErr); + } + + public SSTableLister sstableLister(File directory, OnTxnErr onTxnErr) + { + return new SSTableLister(new File[]{directory}, metadata, onTxnErr); } - public class SSTableLister + public static class SSTableLister { private final OnTxnErr onTxnErr; private boolean skipTemporary; @@ -655,9 +696,13 @@ public class Directories private final Map<Descriptor, Set<Component>> components = new HashMap<>(); private boolean filtered; private String snapshotName; + private final File[] dataPaths; + private final TableMetadata metadata; - private SSTableLister(OnTxnErr onTxnErr) + private SSTableLister(File[] dataPaths, TableMetadata metadata, OnTxnErr onTxnErr) { + this.dataPaths = dataPaths; + this.metadata = metadata; this.onTxnErr = onTxnErr; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 45a6da1..6cbc49a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -5232,14 +5232,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE LifecycleTransaction.rescheduleFailedDeletions(); } + @Deprecated + public void loadNewSSTables(String ksName, String cfName) + { + ColumnFamilyStore.loadNewSSTables(ksName, cfName, null, true, false, false, false, false, false); + } + /** * #{@inheritDoc} */ - public void loadNewSSTables(String ksName, String cfName) + public void importNewSSTables(String ksName, String cfName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck) { if (!isInitialized()) throw new RuntimeException("Not yet initialized, can't load new sstables"); - ColumnFamilyStore.loadNewSSTables(ksName, cfName); + ColumnFamilyStore.loadNewSSTables(ksName, cfName, srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 206a37b..62a73de 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -583,13 +583,20 @@ public interface StorageServiceMBean extends NotificationEmitter public void rescheduleFailedDeletions(); + @Deprecated + public void loadNewSSTables(String ksName, String tableName); + /** - * Load new SSTables to the given keyspace/table + * Import new SSTables to the given keyspace/table * * @param ksName The parent keyspace name * @param tableName The ColumnFamily name where SSTables belong + * @param srcPath The path where the SSTables will be loaded from + * @param resetLevel reset the level to 0 on the new sstables + * @param clearRepaired remove any repaired information from the new sstables + * @param verifyTokens verify that all tokens are owned by the node */ - public void loadNewSSTables(String ksName, String tableName); + public void importNewSSTables(String ksName, String tableName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck); /** * Return a List of Tokens representing a sample of keys across all ColumnFamilyStores. http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 04a5781..49b6563 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1169,11 +1169,17 @@ public class NodeProbe implements AutoCloseable return msProxy.getDroppedMessages(); } + @Deprecated public void loadNewSSTables(String ksName, String cfName) { ssProxy.loadNewSSTables(ksName, cfName); } + public void importNewSSTables(String ksName, String cfName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck) + { + ssProxy.importNewSSTables(ksName, cfName, srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck); + } + public void rebuildIndex(String ksName, String cfName, String... idxNames) { ssProxy.rebuildSecondaryIndex(ksName, cfName, idxNames); http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 2b6fabf..d4cc291 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -92,6 +92,7 @@ public class NodeTool GetSSTables.class, GetMaxHintWindow.class, GossipInfo.class, + Import.class, InvalidateKeyCache.class, InvalidateRowCache.class, InvalidateCounterCache.class, http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/tools/nodetool/Import.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Import.java b/src/java/org/apache/cassandra/tools/nodetool/Import.java new file mode 100644 index 0000000..1a6a69b --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/Import.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import static com.google.common.base.Preconditions.checkArgument; +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; + +import io.airlift.airline.Option; +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +@Command(name = "import", description = "Import new SSTables to the system") +public class Import extends NodeToolCmd +{ + @Arguments(usage = "<keyspace> <table> <directory>", description = "The keyspace, table name and directory to import sstables from") + private List<String> args = new ArrayList<>(); + + @Option(title = "keep_level", + name = {"-l", "--keep-level"}, + description = "Keep the level on the new sstables") + private boolean keepLevel = false; + + @Option(title = "keep_repaired", + name = {"-r", "--keep-repaired"}, + description = "Keep any repaired information from the sstables") + private boolean keepRepaired = false; + + @Option(title = "no_verify_sstables", + name = {"-v", "--no-verify"}, + description = "Don't verify new sstables") + private boolean noVerify = false; + + @Option(title = "no_verify_tokens", + name = {"-t", "--no-tokens"}, + description = "Don't verify that all tokens in the new sstable are owned by the current node") + private boolean noVerifyTokens = false; + + @Option(title = "no_invalidate_caches", + name = {"-c", "--no-invalidate-caches"}, + description = "Don't invalidate the row cache when importing") + private boolean noInvalidateCaches = false; + + @Option(title = "no_jbod_check", + name = {"-j", "--no-jbod-check"}, + description = "Don't iterate over keys to check which data directory is best suited") + private boolean noJBODCheck = false; + + @Option(title = "quick", + name = {"-q", "--quick"}, + description = "Do a quick import without verifying sstables, clearing row cache or checking in which data directory to put the file") + private boolean quick = false; + + @Override + public void execute(NodeProbe probe) + { + checkArgument(args.size() == 3, "import requires keyspace, table name and directory"); + + if (!noVerifyTokens && noVerify) + { + noVerifyTokens = true; + System.out.println("Not verifying tokens since --no-verify or -v is set"); + } + + if (quick) + { + System.out.println("Doing a quick import - skipping sstable verification, row cache invalidation and JBOD checking"); + noVerifyTokens = true; + noInvalidateCaches = true; + noVerify = true; + noJBODCheck = true; + } + probe.importNewSSTables(args.get(0), args.get(1), args.get(2), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, !noJBODCheck); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/src/java/org/apache/cassandra/tools/nodetool/Refresh.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Refresh.java b/src/java/org/apache/cassandra/tools/nodetool/Refresh.java index 726f12a..3c8d4c9 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Refresh.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Refresh.java @@ -36,6 +36,7 @@ public class Refresh extends NodeToolCmd @Override public void execute(NodeProbe probe) { + System.out.println("nodetool refresh is deprecated, use nodetool import instead"); checkArgument(args.size() == 2, "refresh requires ks and cf args"); probe.loadNewSSTables(args.get(0), args.get(1)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/test/unit/org/apache/cassandra/db/ImportTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java new file mode 100644 index 0000000..197d79d --- /dev/null +++ b/test/unit/org/apache/cassandra/db/ImportTest.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.apache.cassandra.cache.RowCacheKey; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.BootStrapper; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ImportTest extends CQLTester +{ + + @Test + public void basicImportTest() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + getCurrentColumnFamilyStore().clearUnsafe(); + + File backupdir = moveToBackupDir(sstables); + + assertEquals(0, execute("select * from %s").size()); + + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, false, false, false, false); + + assertEquals(10, execute("select * from %s").size()); + } + + @Test + @Deprecated + public void refreshTest() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + getCurrentColumnFamilyStore().clearUnsafe(); + assertEquals(0, execute("select * from %s").size()); + getCurrentColumnFamilyStore().loadNewSSTables(); + assertEquals(10, execute("select * from %s").size()); + } + + @Test + public void importResetLevelTest() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + getCurrentColumnFamilyStore().clearUnsafe(); + for (SSTableReader sstable : sstables) + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 123); + File backupdir = moveToBackupDir(sstables); + assertEquals(0, execute("select * from %s").size()); + + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, false, false, false, false); + + assertEquals(10, execute("select * from %s").size()); + sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + assertEquals(1, sstables.size()); + for (SSTableReader sstable : sstables) + assertEquals(123, sstable.getSSTableLevel()); + + getCurrentColumnFamilyStore().clearUnsafe(); + backupdir = moveToBackupDir(sstables); + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), true, false, false, false, false, false); + sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + assertEquals(1, sstables.size()); + for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables()) + assertEquals(0, sstable.getSSTableLevel()); + } + + + @Test + public void importClearRepairedTest() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + getCurrentColumnFamilyStore().clearUnsafe(); + for (SSTableReader sstable : sstables) + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 111, null); + + File backupdir = moveToBackupDir(sstables); + + assertEquals(0, execute("select * from %s").size()); + + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, false, false, false, false); + + assertEquals(10, execute("select * from %s").size()); + sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + assertEquals(1, sstables.size()); + for (SSTableReader sstable : sstables) + assertTrue(sstable.isRepaired()); + + getCurrentColumnFamilyStore().clearUnsafe(); + backupdir = moveToBackupDir(sstables); + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, true, false, false, false, false); + sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + assertEquals(1, sstables.size()); + for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables()) + assertFalse(sstable.isRepaired()); + } + + private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException + { + Path temp = Files.createTempDirectory("importtest"); + SSTableReader sst = sstables.iterator().next(); + System.out.println("DIR: "+sst.descriptor.directory); + String tabledir = sst.descriptor.directory.getName(); + String ksdir = sst.descriptor.directory.getParentFile().getName(); + Path backupdir = Files.createDirectories(Paths.get(temp.toString(), ksdir, tabledir)); + + for (SSTableReader sstable : sstables) + { + for (File f : sstable.descriptor.directory.listFiles()) + { + if (f.toString().contains(sstable.descriptor.baseFilename())) + { + System.out.println("move " + f.toPath() + " to " + backupdir); + Files.move(f.toPath(), new File(backupdir.toFile(), f.getName()).toPath()); + } + } + } + return backupdir.toFile(); + + } + + @Test + public void testBestDisk() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 1), InetAddressAndPort.getByName("127.0.0.1")); + Directories dirs = new Directories(getCurrentColumnFamilyStore().metadata(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), + new Directories.DataDirectory(new File("/tmp/2")), + new Directories.DataDirectory(new File("/tmp/3")))); + MockCFS mock = new MockCFS(getCurrentColumnFamilyStore(), dirs); + + int rows = 1000; + Random rand = new Random(); + for (int i = 0; i < rows; i++) + execute("insert into %s (id, d) values (?, ?)", rand.nextInt(), i); + UntypedResultSet res = execute("SELECT token(id) as t FROM %s"); + long disk1 = 0, disk2 = 0, disk3 = 0; + DiskBoundaries boundaries = mock.getDiskBoundaries(); + for (UntypedResultSet.Row r : res) + { + Token t = new Murmur3Partitioner.LongToken(r.getLong("t")); + if (boundaries.positions.get(0).compareTo(t.minKeyBound()) > 0) + disk1++; + else if (boundaries.positions.get(1).compareTo(t.minKeyBound()) > 0) + disk2++; + else + disk3++; + } + File expected; + if (disk1 >= disk2 && disk1 >= disk3) + expected = new File("/tmp/1"); + else if (disk2 >= disk1 && disk2 >= disk3) + expected = new File("/tmp/2"); + else + expected = new File("/tmp/3"); + + getCurrentColumnFamilyStore().forceBlockingFlush(); + SSTableReader sstable = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next(); + File bestDisk = ColumnFamilyStore.findBestDiskAndInvalidateCaches(mock, sstable.descriptor, "/tmp/", false, true); + assertTrue(expected + " : "+ bestDisk, bestDisk.toString().startsWith(expected.toString())); + } + + @Test + public void testNoCounting() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + Directories dirs = new Directories(getCurrentColumnFamilyStore().metadata(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), + new Directories.DataDirectory(new File("/tmp/2")), + new Directories.DataDirectory(new File("/tmp/3")))); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + Set<SSTableReader> toMove = getCurrentColumnFamilyStore().getLiveSSTables(); + getCurrentColumnFamilyStore().clearUnsafe(); + File dir = moveToBackupDir(toMove); + + MockCFS mock = new MockCFS(getCurrentColumnFamilyStore(), dirs); + mock.loadNewSSTables(dir.toString(), false, false, false, false, false, false); + assertEquals(1, mock.getLiveSSTables().size()); + for (SSTableReader sstable : mock.getLiveSSTables()) + { + String expected = new File("/tmp/").getCanonicalPath(); + assertTrue("dir = "+sstable.descriptor.directory + " : "+expected , sstable.descriptor.directory.toString().startsWith(expected)); + assertTrue(sstable.descriptor.directory.toString().contains(getCurrentColumnFamilyStore().metadata.id.toHexString())); + } + } + + @Test + public void testImportCorrupt() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + SSTableReader sstableToCorrupt = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next(); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i + 10, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + + getCurrentColumnFamilyStore().clearUnsafe(); + + try (RandomAccessFile file = new RandomAccessFile(sstableToCorrupt.descriptor.filenameFor(Component.DIGEST), "rw")) + { + Long correctChecksum = Long.valueOf(file.readLine()); + VerifyTest.writeChecksum(++correctChecksum, sstableToCorrupt.descriptor.filenameFor(Component.DIGEST)); + } + + File backupdir = moveToBackupDir(sstables); + try + { + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, true, false, false, false); + fail("loadNewSSTables should fail!"); + } + catch (Throwable t) + { + for (File f : getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles()) + { + if (f.isFile()) + fail("there should not be any sstables in the data directory after a failed import: " + f); + } + } + } + + + @Test(expected = RuntimeException.class) + public void testImportOutOfRange() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + for (int i = 0; i < 1000; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + + getCurrentColumnFamilyStore().clearUnsafe(); + + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.2")); + tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.3")); + + + File backupdir = moveToBackupDir(sstables); + try + { + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, true, true, false, false); + } + finally + { + tmd.clearUnsafe(); + } + } + + + @Test + public void testImportInvalidateCache() throws Throwable + { + createTable("create table %s (id int primary key, d int) WITH caching = { 'keys': 'NONE', 'rows_per_partition': 'ALL' }"); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + CacheService.instance.setRowCacheCapacityInMB(1); + + Set<RowCacheKey> keysToInvalidate = new HashSet<>(); + + // populate the row cache with keys from the sstable we are about to remove + for (int i = 0; i < 10; i++) + { + execute("SELECT * FROM %s WHERE id = ?", i); + } + Iterator<RowCacheKey> it = CacheService.instance.rowCache.keyIterator(); + while (it.hasNext()) + { + keysToInvalidate.add(it.next()); + } + SSTableReader sstableToImport = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next(); + getCurrentColumnFamilyStore().clearUnsafe(); + + + for (int i = 10; i < 20; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + + Set<RowCacheKey> allCachedKeys = new HashSet<>(); + + // populate row cache with sstable we are keeping + for (int i = 10; i < 20; i++) + { + execute("SELECT * FROM %s WHERE id = ?", i); + } + it = CacheService.instance.rowCache.keyIterator(); + while (it.hasNext()) + { + allCachedKeys.add(it.next()); + } + assertEquals(20, CacheService.instance.rowCache.size()); + File backupdir = moveToBackupDir(Collections.singleton(sstableToImport)); + // make sure we don't wipe caches with invalidateCaches = false: + Set<SSTableReader> beforeFirstImport = getCurrentColumnFamilyStore().getLiveSSTables(); + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, true, true, false, false); + assertEquals(20, CacheService.instance.rowCache.size()); + Set<SSTableReader> toMove = Sets.difference(getCurrentColumnFamilyStore().getLiveSSTables(), beforeFirstImport); + getCurrentColumnFamilyStore().clearUnsafe(); + // move away the sstable we just imported again: + backupdir = moveToBackupDir(toMove); + getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, true, true, true, false); + assertEquals(10, CacheService.instance.rowCache.size()); + it = CacheService.instance.rowCache.keyIterator(); + while (it.hasNext()) + { + // make sure the keys from the sstable we are importing are invalidated and that the other one is still there + RowCacheKey rck = it.next(); + assertTrue(allCachedKeys.contains(rck)); + assertFalse(keysToInvalidate.contains(rck)); + } + } + + @Test + public void testImportCacheEnabledWithoutSrcDir() throws Throwable + { + createTable("create table %s (id int primary key, d int) WITH caching = { 'keys': 'NONE', 'rows_per_partition': 'ALL' }"); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + CacheService.instance.setRowCacheCapacityInMB(1); + getCurrentColumnFamilyStore().clearUnsafe(); + getCurrentColumnFamilyStore().loadNewSSTables(null, false, false, false, false, true, false); + assertEquals(1, getCurrentColumnFamilyStore().getLiveSSTables().size()); + } + + private static class MockCFS extends ColumnFamilyStore + { + public MockCFS(ColumnFamilyStore cfs, Directories dirs) + { + super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/127cfff2/test/unit/org/apache/cassandra/db/VerifyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java index e996082..42c4fd3 100644 --- a/test/unit/org/apache/cassandra/db/VerifyTest.java +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java @@ -632,7 +632,7 @@ public class VerifyTest } } - protected void writeChecksum(long checksum, String filePath) + public static void writeChecksum(long checksum, String filePath) { File outFile = new File(filePath); BufferedWriter out = null; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
