This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 309b3033d44c5cdc18d6e3897661966853d39407 Author: Stefan Miklosovic <[email protected]> AuthorDate: Mon Feb 15 16:23:10 2021 +0100 Add possibility to copy SSTables in SSTableImporter instead of moving them Patch by Stefan Miklosovic; reviewed by Michael Semb Wever and Marcus Eriksson for CASSANDRA-16407 --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 10 +- .../cassandra/db/ColumnFamilyStoreMBean.java | 27 ++++- .../org/apache/cassandra/db/SSTableImporter.java | 47 +++++++-- .../cassandra/io/sstable/format/SSTableReader.java | 23 ++++- .../cassandra/io/sstable/format/SSTableWriter.java | 27 +++++ .../org/apache/cassandra/io/util/FileUtils.java | 115 ++++++++++++++++----- src/java/org/apache/cassandra/tools/NodeProbe.java | 4 +- .../apache/cassandra/tools/nodetool/Import.java | 7 +- test/unit/org/apache/cassandra/db/ImportTest.java | 77 +++++++++++--- .../cassandra/io/sstable/SSTableReaderTest.java | 6 +- 11 files changed, 284 insertions(+), 60 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 839302b..24163c6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-beta5 + * Add possibility to copy SSTables in SSTableImporter instead of moving them (CASSANDRA-16407) * Fix DESCRIBE statement for CUSTOM indices with options (CASSANDRA-16482) * Fix cassandra-stress JMX connection (CASSANDRA-16473) * Avoid processing redundant application states on endpoint changes (CASSANDRA-16381) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 1089e13..00aebc4 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -709,7 +709,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /** * #{@inheritDoc} */ - public synchronized List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify) + public synchronized List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData) { SSTableImporter.Options options = SSTableImporter.Options.options(srcPaths) .resetLevel(resetLevel) @@ -717,11 +717,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean .verifySSTables(verifySSTables) .verifyTokens(verifyTokens) .invalidateCaches(invalidateCaches) - .extendedVerify(extendedVerify).build(); + .extendedVerify(extendedVerify) + .copyData(copyData).build(); return sstableImporter.importNewSSTables(options); } + public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify) + { + return importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, false); + } + Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory) { Descriptor newDescriptor; diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index c2cf393..23757ba 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -152,18 +152,41 @@ public interface ColumnFamilyStoreMBean * @param verifySSTables if the new sstables should be verified that they are not corrupt * @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node * @param invalidateCaches if row cache should be invalidated for the keys in the new sstables - * @param jbodCheck if the new sstables should be placed 'optimally' - count tokens and move the sstable to the directory where it has the most keys * @param extendedVerify if we should run an extended verify checking all values in the new sstables * * @return list of failed import directories */ + @Deprecated + public List<String> importNewSSTables(Set<String> srcPaths, + boolean resetLevel, + boolean clearRepaired, + boolean verifySSTables, + boolean verifyTokens, + boolean invalidateCaches, + boolean extendedVerify); + + /** + * Load new sstables from the given directory + * + * @param srcPaths the path to the new sstables - if it is an empty set, the data directories will be scanned + * @param resetLevel if the level should be reset to 0 on the new sstables + * @param clearRepaired if repaired info should be wiped from the new sstables + * @param verifySSTables if the new sstables should be verified that they are not corrupt + * @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node + * @param invalidateCaches if row cache should be invalidated for the keys in the new sstables + * @param extendedVerify if we should run an extended verify checking all values in the new sstables + * @param copyData if we should copy data from source paths instead of moving them + * + * @return list of failed import directories + */ public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, - boolean extendedVerify); + boolean extendedVerify, + boolean copyData); @Deprecated public void loadNewSSTables(); diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java index 7597f82..989ff12 100644 --- a/src/java/org/apache/cassandra/db/SSTableImporter.java +++ b/src/java/org/apache/cassandra/db/SSTableImporter.java @@ -20,7 +20,6 @@ package org.apache.cassandra.db; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -40,9 +39,7 @@ import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.KeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; @@ -139,7 +136,7 @@ public class SSTableImporter Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir); maybeMutateMetadata(entry.getKey(), options); movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue())); - SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue()); + SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData); newSSTablesPerDirectory.add(sstable); } catch (Throwable t) @@ -149,7 +146,14 @@ public class SSTableImporter { logger.error("Failed importing sstables in directory {}", dir, t); failedDirectories.add(dir); - moveSSTablesBack(movedSSTables); + if (options.copyData) + { + removeCopiedSSTables(movedSSTables); + } + else + { + moveSSTablesBack(movedSSTables); + } movedSSTables.clear(); newSSTablesPerDirectory.clear(); break; @@ -285,6 +289,25 @@ public class SSTableImporter } /** + * Similarly for moving case, we need to delete all SSTables which were copied already but the + * copying as a whole has failed so we do not leave any traces behind such failed import. + * + * @param movedSSTables tables we have moved already (by copying) which need to be removed + */ + private void removeCopiedSSTables(Set<MovedSSTable> movedSSTables) + { + logger.debug("Removing copied SSTables which were left in data directories after failed SSTable import."); + for (MovedSSTable movedSSTable : movedSSTables) + { + if (new File(movedSSTable.newDescriptor.filenameFor(Component.DATA)).exists()) + { + // no logging here as for moveSSTablesBack case above as logging is done in delete method + SSTableWriter.delete(movedSSTable.newDescriptor, movedSSTable.components); + } + } + } + + /** * Iterates over all keys in the sstable index and invalidates the row cache */ @VisibleForTesting @@ -365,8 +388,9 @@ public class SSTableImporter private final boolean verifyTokens; private final boolean invalidateCaches; private final boolean extendedVerify; + private final boolean copyData; - public Options(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify) + public Options(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData) { this.srcPaths = srcPaths; this.resetLevel = resetLevel; @@ -375,6 +399,7 @@ public class SSTableImporter this.verifyTokens = verifyTokens; this.invalidateCaches = invalidateCaches; this.extendedVerify = extendedVerify; + this.copyData = copyData; } public static Builder options(String srcDir) @@ -403,6 +428,7 @@ public class SSTableImporter ", verifyTokens=" + verifyTokens + ", invalidateCaches=" + invalidateCaches + ", extendedVerify=" + extendedVerify + + ", copyData= " + copyData + '}'; } @@ -415,6 +441,7 @@ public class SSTableImporter private boolean verifyTokens = false; private boolean invalidateCaches = false; private boolean extendedVerify = false; + private boolean copyData = false; private Builder(Set<String> srcPath) { @@ -458,9 +485,15 @@ public class SSTableImporter return this; } + public Builder copyData(boolean value) + { + copyData = value; + return this; + } + public Options build() { - return new Options(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify); + return new Options(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, copyData); } } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 3e14422..652d9c0 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -55,6 +55,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.UnknownColumnException; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.metadata.*; @@ -2254,7 +2255,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * * All components given will be moved/renamed */ - public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components) + public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components, boolean copyData) { if (!oldDescriptor.isCompatible()) throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", @@ -2276,8 +2277,24 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS throw new RuntimeException(msg); } - logger.info("Renaming new SSTable {} to {}", oldDescriptor, newDescriptor); - SSTableWriter.rename(oldDescriptor, newDescriptor, components); + if (copyData) + { + try + { + logger.info("Hardlinking new SSTable {} to {}", oldDescriptor, newDescriptor); + SSTableWriter.hardlink(oldDescriptor, newDescriptor, components); + } + catch (FSWriteError ex) + { + logger.warn("Unable to hardlink new SSTable {} to {}, falling back to copying", oldDescriptor, newDescriptor, ex); + SSTableWriter.copy(oldDescriptor, newDescriptor, components); + } + } + else + { + logger.info("Moving new SSTable {} to {}", oldDescriptor, newDescriptor); + SSTableWriter.rename(oldDescriptor, newDescriptor, components); + } SSTableReader reader; try diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 1dbfcdb..cce5378 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -344,6 +344,33 @@ public abstract class SSTableWriter extends SSTable implements Transactional FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY)); } + public static void copy(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components) + { + for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY))) + { + FileUtils.copyWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component)); + } + + // do -Data last because -Data present should mean the sstable was completely copied before crash + FileUtils.copyWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA)); + + // copy it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader + FileUtils.copyWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY)); + } + + public static void hardlink(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components) + { + for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY))) + { + FileUtils.createHardLinkWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component)); + } + + // do -Data last because -Data present should mean the sstable was completely copied before crash + FileUtils.createHardLinkWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA)); + + // copy it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader + FileUtils.createHardLinkWithoutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY)); + } public static abstract class Factory { diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index e0ea436..7798bd7 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -114,28 +114,6 @@ public final class FileUtils } } - public static void createHardLink(String from, String to) - { - createHardLink(new File(from), new File(to)); - } - - public static void createHardLink(File from, File to) - { - if (to.exists()) - throw new RuntimeException("Tried to create duplicate hard link to " + to); - if (!from.exists()) - throw new RuntimeException("Tried to hard link to file that does not exist " + from); - - try - { - Files.createLink(to.toPath(), from.toPath()); - } - catch (IOException e) - { - throw new FSWriteError(e, to); - } - } - private static final File tempDir = new File(JAVA_IO_TMPDIR.getString()); private static final AtomicLong tempFileNum = new AtomicLong(); @@ -193,6 +171,62 @@ public final class FileUtils return f; } + public static void createHardLink(String from, String to) + { + createHardLink(new File(from), new File(to)); + } + + public static void createHardLink(File from, File to) + { + if (to.exists()) + throw new RuntimeException("Tried to create duplicate hard link to " + to); + if (!from.exists()) + throw new RuntimeException("Tried to hard link to file that does not exist " + from); + + try + { + Files.createLink(to.toPath(), from.toPath()); + } + catch (IOException e) + { + throw new FSWriteError(e, to); + } + } + + public static void createHardLinkWithConfirm(File from, File to) + { + try + { + createHardLink(from, to); + } + catch (FSWriteError ex) + { + throw ex; + } + catch (Throwable t) + { + throw new RuntimeException(String.format("Unable to hardlink from %s to %s", from, to), t); + } + } + + public static void createHardLinkWithConfirm(String from, String to) + { + createHardLinkWithConfirm(new File(from), new File(to)); + } + + public static void createHardLinkWithoutConfirm(String from, String to) + { + try + { + createHardLink(new File(from), new File(to)); + } + catch (FSWriteError fse) + { + if (logger.isTraceEnabled()) + logger.trace("Could not hardlink file " + from + " to " + to, fse); + } + } + public static Throwable deleteWithConfirm(String filePath, Throwable accumulate) { return deleteWithConfirm(new File(filePath), accumulate, null); @@ -244,6 +278,40 @@ public final class FileUtils maybeFail(deleteWithConfirm(file, null, rateLimiter)); } + public static void copyWithOutConfirm(String from, String to) + { + try + { + Files.copy(Paths.get(from), Paths.get(to)); + } + catch (IOException e) + { + if (logger.isTraceEnabled()) + logger.trace("Could not copy file" + from + " to " + to, e); + } + } + + public static void copyWithConfirm(String from, String to) + { + copyWithConfirm(new File(from), new File(to)); + } + + public static void copyWithConfirm(File from, File to) + { + assert from.exists(); + if (logger.isTraceEnabled()) + logger.trace("Copying {} to {}", from.getPath(), to.getPath()); + + try + { + Files.copy(from.toPath(), to.toPath()); + } + catch (IOException e) + { + throw new FSWriteError(e, "Could not copy file" + from + " to " + to); + } + } + public static void renameWithOutConfirm(String from, String to) { try @@ -298,6 +366,7 @@ public final class FileUtils } } + public static void truncate(String path, long size) { try(FileChannel channel = FileChannel.open(Paths.get(path), StandardOpenOption.READ, StandardOpenOption.WRITE)) @@ -899,7 +968,7 @@ public final class FileUtils * signed long (2^63-1), if the filesystem is any bigger, then the size overflows. {@code SafeFileStore} will * return {@code Long.MAX_VALUE} if the size overflow.</p> * - * @see https://bugs.openjdk.java.net/browse/JDK-8162520. + * @see <a href="https://bugs.openjdk.java.net/browse/JDK-8162520">JDK-8162520</a>. */ private static final class SafeFileStore extends FileStore { diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 90990f1..605edba 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1216,9 +1216,9 @@ public class NodeProbe implements AutoCloseable ssProxy.loadNewSSTables(ksName, cfName); } - public List<String> importNewSSTables(String ksName, String cfName, Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify) + public List<String> importNewSSTables(String ksName, String cfName, Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData) { - return getCfsProxy(ksName, cfName).importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify); + return getCfsProxy(ksName, cfName).importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, copyData); } public void rebuildIndex(String ksName, String cfName, String... idxNames) diff --git a/src/java/org/apache/cassandra/tools/nodetool/Import.java b/src/java/org/apache/cassandra/tools/nodetool/Import.java index 08fe35d..73fa314 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Import.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Import.java @@ -75,6 +75,11 @@ public class Import extends NodeToolCmd description = "Run an extended verify, verifying all values in the new sstables") private boolean extendedVerify = false; + @Option(title = "copy_data", + name = {"-p", "--copy-data"}, + description = "Copy data from source directories instead of moving them") + private boolean copyData = false; + @Override public void execute(NodeProbe probe) { @@ -89,7 +94,7 @@ public class Import extends NodeToolCmd extendedVerify = false; } List<String> srcPaths = Lists.newArrayList(args.subList(2, args.size())); - List<String> failedDirs = probe.importNewSSTables(args.get(0), args.get(1), new HashSet<>(srcPaths), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, extendedVerify); + List<String> failedDirs = probe.importNewSSTables(args.get(0), args.get(1), new HashSet<>(srcPaths), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, extendedVerify, copyData); if (!failedDirs.isEmpty()) { PrintStream err = probe.output().err; diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java index 4094aa4..c0c3799 100644 --- a/test/unit/org/apache/cassandra/db/ImportTest.java +++ b/test/unit/org/apache/cassandra/db/ImportTest.java @@ -23,19 +23,16 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Random; import java.util.Set; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.cache.RowCacheKey; @@ -43,8 +40,6 @@ import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.BootStrapper; -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.InetAddressAndPort; @@ -53,20 +48,45 @@ import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ImportTest extends CQLTester { + + @Test + public void basicImportByMovingTest() throws Throwable + { + File backupDir = prepareBasicImporting(); + // copy is false - so importing will be done by moving + importSSTables(SSTableImporter.Options.options(backupDir.toString()).copyData(false).build(), 10); + // files were moved + Assert.assertEquals(0, countFiles(backupDir)); + } + @Test - public void basicImportTest() throws Throwable + public void basicImportByCopyingTest() throws Throwable + { + File backupDir = prepareBasicImporting(); + // copy is true - so importing will be done by copying + importSSTables(SSTableImporter.Options.options(backupDir.toString()).copyData(true).build(), 10); + // files are left there as they were just copied + Assert.assertNotEquals(0, countFiles(backupDir)); + } + + private File prepareBasicImporting() throws Throwable { createTable("create table %s (id int primary key, d int)"); + + for (int i = 0; i < 10; i++) + { execute("insert into %s (id, d) values (?, ?)", i, i); + } + getCurrentColumnFamilyStore().forceBlockingFlush(); + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); getCurrentColumnFamilyStore().clearUnsafe(); @@ -74,11 +94,14 @@ public class ImportTest extends CQLTester assertEquals(0, execute("select * from %s").size()); - SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).build(); - SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); - importer.importNewSSTables(options); + return backupdir; + } - assertEquals(10, execute("select * from %s").size()); + private List<String> importSSTables(SSTableImporter.Options options, int expectedRows) throws Throwable { + SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); + List<String> failedDirectories = importer.importNewSSTables(options); + assertEquals(expectedRows, execute("select * from %s").size()); + return failedDirectories; } @Test @@ -276,7 +299,7 @@ public class ImportTest extends CQLTester sstable.selfRef().release(); } - private void testCorruptHelper(boolean verify) throws Throwable + private void testCorruptHelper(boolean verify, boolean copy) throws Throwable { createTable("create table %s (id int primary key, d int)"); for (int i = 0; i < 10; i++) @@ -312,7 +335,7 @@ public class ImportTest extends CQLTester // first we moved out 2 sstables, one correct and one corrupt in to a single directory (backupdir) // then we moved out 1 sstable, a correct one (in backupdirCorrect). // now import should fail import on backupdir, but import the one in backupdirCorrect. - SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdirCorrect.toString())).verifySSTables(verify).build(); + SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdirCorrect.toString())).copyData(copy).verifySSTables(verify).build(); SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); List<String> failedDirectories = importer.importNewSSTables(options); assertEquals(Collections.singletonList(backupdir.toString()), failedDirectories); @@ -324,7 +347,15 @@ public class ImportTest extends CQLTester } assertEquals("Data dir should contain one file", 1, countFiles(getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables())); assertEquals("backupdir contained 2 files before import, should still contain 2 after failing to import it", beforeImport, Sets.newHashSet(backupdir.listFiles())); - assertEquals("backupdirCorrect contained 1 file before import, should be empty after import", 0, countFiles(backupdirCorrect)); + if (copy) + { + assertEquals("backupdirCorrect contained 1 file before import, should contain 1 after import too", 1, countFiles(backupdirCorrect)); + } + else + { + assertEquals("backupdirCorrect contained 1 file before import, should be empty after import", 0, countFiles(backupdirCorrect)); + } + } private int countFiles(File dir) @@ -344,13 +375,25 @@ public class ImportTest extends CQLTester @Test public void testImportCorrupt() throws Throwable { - testCorruptHelper(true); + testCorruptHelper(true, false); + } + + @Test + public void testImportCorruptWithCopying() throws Throwable + { + testCorruptHelper(true, true); } @Test public void testImportCorruptWithoutValidation() throws Throwable { - testCorruptHelper(false); + testCorruptHelper(false, false); + } + + @Test + public void testImportCorruptWithoutValidationWithCopying() throws Throwable + { + testCorruptHelper(false, true); } @Test diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index c8bdd09..0b64028 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -707,7 +707,7 @@ public class SSTableReaderTest ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); SSTableReader sstable = getNewSSTable(cfs); Descriptor notLiveDesc = new Descriptor(new File("/tmp"), "", "", 0); - SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components); + SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components, false); } @Test(expected = RuntimeException.class) @@ -717,7 +717,7 @@ public class SSTableReaderTest ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); SSTableReader sstable = getNewSSTable(cfs); Descriptor notLiveDesc = new Descriptor(new File("/tmp"), "", "", 0); - SSTableReader.moveAndOpenSSTable(cfs, notLiveDesc, sstable.descriptor, sstable.components); + SSTableReader.moveAndOpenSSTable(cfs, notLiveDesc, sstable.descriptor, sstable.components, false); } @Test @@ -738,7 +738,7 @@ public class SSTableReaderTest assertFalse(f.exists()); assertTrue(new File(sstable.descriptor.filenameFor(c)).exists()); } - SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components); + SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components, false); // make sure the files were moved: for (Component c : sstable.components) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
