Introduce safer durable sstable membership management (and simplify cleanup of compaction leftovers)
Instead of using temporary files and system tables, this patch introduces a simple transaction log for sstable membership edits that can be committed/aborted atomically and simply replayed on startup. patch by stefania; reviewed by benedict for CASSANDRA-7066 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b09e60f7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b09e60f7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b09e60f7 Branch: refs/heads/trunk Commit: b09e60f72bb2f37235d9e9190c25db36371b3c18 Parents: e338d2f Author: Stefania Alborghetti <[email protected]> Authored: Mon Apr 27 14:38:53 2015 +0800 Committer: Benedict Elliott Smith <[email protected]> Committed: Fri Jul 24 14:41:51 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 4 + bin/sstablelister | 55 ++ bin/sstablelister.bat | 41 + build.xml | 4 +- .../org/apache/cassandra/config/Config.java | 2 +- .../cassandra/config/DatabaseDescriptor.java | 2 +- .../apache/cassandra/db/ColumnFamilyStore.java | 149 +--- .../org/apache/cassandra/db/Directories.java | 57 +- src/java/org/apache/cassandra/db/Memtable.java | 31 +- .../org/apache/cassandra/db/SystemKeyspace.java | 126 +-- .../db/compaction/CompactionController.java | 2 +- .../db/compaction/CompactionManager.java | 22 +- .../cassandra/db/compaction/CompactionTask.java | 24 +- .../db/compaction/LeveledCompactionTask.java | 8 +- .../cassandra/db/compaction/OperationType.java | 18 +- .../db/compaction/SSTableSplitter.java | 6 +- .../cassandra/db/compaction/Scrubber.java | 18 +- .../SizeTieredCompactionStrategy.java | 6 +- .../cassandra/db/compaction/Upgrader.java | 13 +- .../writers/CompactionAwareWriter.java | 10 +- .../writers/DefaultCompactionWriter.java | 11 +- .../writers/MajorLeveledCompactionWriter.java | 15 +- .../writers/MaxSSTableSizeWriter.java | 25 +- .../SplittingSizeTieredCompactionWriter.java | 39 +- .../apache/cassandra/db/lifecycle/Helpers.java | 58 +- .../db/lifecycle/LifecycleTransaction.java | 150 +++- .../apache/cassandra/db/lifecycle/Tracker.java | 50 +- .../cassandra/db/lifecycle/TransactionLogs.java | 786 +++++++++++++++++++ .../io/sstable/AbstractSSTableSimpleWriter.java | 8 +- .../apache/cassandra/io/sstable/Descriptor.java | 107 ++- .../io/sstable/IndexSummaryManager.java | 7 +- .../apache/cassandra/io/sstable/SSTable.java | 17 +- .../io/sstable/SSTableDeletingTask.java | 130 --- .../cassandra/io/sstable/SSTableLoader.java | 22 +- .../cassandra/io/sstable/SSTableRewriter.java | 20 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 7 +- .../io/sstable/SSTableSimpleWriter.java | 5 +- .../cassandra/io/sstable/SSTableTxnWriter.java | 101 +++ .../io/sstable/format/SSTableReader.java | 171 ++-- .../io/sstable/format/SSTableWriter.java | 50 +- .../io/sstable/format/big/BigFormat.java | 12 +- .../io/sstable/format/big/BigTableWriter.java | 69 +- .../io/sstable/metadata/MetadataSerializer.java | 7 +- .../org/apache/cassandra/io/util/FileUtils.java | 65 ++ .../cassandra/io/util/SequentialWriter.java | 12 +- .../cassandra/service/CassandraDaemon.java | 16 +- .../apache/cassandra/service/GCInspector.java | 10 +- .../apache/cassandra/service/StartupChecks.java | 6 +- .../cassandra/service/StorageService.java | 4 +- .../cassandra/streaming/StreamLockfile.java | 128 --- .../cassandra/streaming/StreamReader.java | 4 +- .../cassandra/streaming/StreamReceiveTask.java | 20 +- .../cassandra/streaming/StreamSession.java | 7 + .../cassandra/tools/StandaloneLister.java | 214 +++++ .../cassandra/tools/StandaloneScrubber.java | 5 +- .../cassandra/tools/StandaloneSplitter.java | 10 +- .../cassandra/tools/StandaloneUpgrader.java | 19 +- .../utils/concurrent/Transactional.java | 25 +- .../la-1-big-CompressionInfo.db | Bin 0 -> 43 bytes .../la-1-big-Data.db | Bin 0 -> 93 bytes .../la-1-big-Digest.adler32 | 1 + .../la-1-big-Filter.db | Bin 0 -> 16 bytes .../la-1-big-Index.db | Bin 0 -> 54 bytes .../la-1-big-Statistics.db | Bin 0 -> 4442 bytes .../la-1-big-Summary.db | Bin 0 -> 80 bytes .../la-1-big-TOC.txt | 8 + .../tmp-la-2-big-Data.db | Bin 0 -> 93 bytes .../tmp-la-2-big-Index.db | Bin 0 -> 54 bytes .../tmplink-la-2-big-Data.db | Bin 0 -> 93 bytes .../tmplink-la-2-big-Index.db | Bin 0 -> 54 bytes .../manifest.json | 1 + .../manifest.json | 1 + .../manifest.json | 1 + .../io/sstable/CQLSSTableWriterLongTest.java | 5 +- test/unit/org/apache/cassandra/MockSchema.java | 12 +- test/unit/org/apache/cassandra/Util.java | 3 - .../org/apache/cassandra/cql3/CQLTester.java | 2 +- .../cassandra/db/ColumnFamilyStoreTest.java | 150 +--- .../apache/cassandra/db/DirectoriesTest.java | 24 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 7 +- .../apache/cassandra/db/SystemKeyspaceTest.java | 48 ++ .../db/compaction/AntiCompactionTest.java | 4 +- .../compaction/CompactionAwareWriterTest.java | 8 +- .../db/compaction/CompactionsTest.java | 69 +- .../LeveledCompactionStrategyTest.java | 5 +- .../cassandra/db/lifecycle/HelpersTest.java | 25 +- .../db/lifecycle/LifecycleTransactionTest.java | 9 +- .../db/lifecycle/RealTransactionsTest.java | 228 ++++++ .../cassandra/db/lifecycle/TrackerTest.java | 40 +- .../db/lifecycle/TransactionLogsTest.java | 558 +++++++++++++ .../io/sstable/BigTableWriterTest.java | 27 +- .../io/sstable/CQLSSTableWriterClientTest.java | 9 + .../cassandra/io/sstable/DescriptorTest.java | 18 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../cassandra/io/sstable/SSTableLoaderTest.java | 123 ++- .../io/sstable/SSTableRewriterTest.java | 173 ++-- .../cassandra/io/sstable/SSTableUtils.java | 11 +- .../metadata/MetadataSerializerTest.java | 2 +- .../org/apache/cassandra/schema/DefsTest.java | 4 +- .../streaming/StreamTransferTaskTest.java | 3 +- .../concurrent/AbstractTransactionalTest.java | 31 +- 101 files changed, 3265 insertions(+), 1357 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 70b26f5..a0dadc3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,9 @@ 3.0 +<<<<<<< HEAD * Implement proper sandboxing for UDFs (CASSANDRA-9402) +======= + * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066) +>>>>>>> Introduce safer durable sstable membership management * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850) * Metrics should use up to date nomenclature (CASSANDRA-9448) * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/bin/sstablelister ---------------------------------------------------------------------- diff --git a/bin/sstablelister b/bin/sstablelister new file mode 100755 index 0000000..a79409d --- /dev/null +++ b/bin/sstablelister @@ -0,0 +1,55 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$CASSANDRA_INCLUDE" = "x" ]; then + for include in /usr/share/cassandra/cassandra.in.sh \ + /usr/local/share/cassandra/cassandra.in.sh \ + /opt/cassandra/cassandra.in.sh \ + ~/.cassandra.in.sh \ + "`dirname "$0"`/cassandra.in.sh"; do + if [ -r "$include" ]; then + . "$include" + break + fi + done +elif [ -r "$CASSANDRA_INCLUDE" ]; then + . "$CASSANDRA_INCLUDE" +fi + +# Use JAVA_HOME if set, otherwise look for java in PATH +if [ -x "$JAVA_HOME/bin/java" ]; then + JAVA="$JAVA_HOME/bin/java" +else + JAVA="`which java`" +fi + +if [ -z "$CLASSPATH" ]; then + echo "You must set the CLASSPATH var" >&2 + exit 1 +fi + +if [ "x$MAX_HEAP_SIZE" = "x" ]; then + MAX_HEAP_SIZE="256M" +fi + +"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \ + -Dcassandra.storagedir="$cassandra_storagedir" \ + -Dlogback.configurationFile=logback-tools.xml \ + org.apache.cassandra.tools.StandaloneLister "$@" + +# vi:ai sw=4 ts=4 tw=0 et http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/bin/sstablelister.bat ---------------------------------------------------------------------- diff --git a/bin/sstablelister.bat b/bin/sstablelister.bat new file mode 100644 index 0000000..cb50a08 --- /dev/null +++ b/bin/sstablelister.bat @@ -0,0 +1,41 @@ +@REM +@REM Licensed to the Apache Software Foundation (ASF) under one or more +@REM contributor license agreements. See the NOTICE file distributed with +@REM this work for additional information regarding copyright ownership. +@REM The ASF licenses this file to You under the Apache License, Version 2.0 +@REM (the "License"); you may not use this file except in compliance with +@REM the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, software +@REM distributed under the License is distributed on an "AS IS" BASIS, +@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@REM See the License for the specific language governing permissions and +@REM limitations under the License. + +@echo off +if "%OS%" == "Windows_NT" setlocal + +pushd "%~dp0" +call cassandra.in.bat + +if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneLister +if NOT DEFINED JAVA_HOME goto :err + +REM ***** JAVA options ***** +set JAVA_OPTS=^ + -Dlogback.configurationFile=logback-tools.xml + +set TOOLS_PARAMS= + +"%JAVA_HOME%\bin\java" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% "%CASSANDRA_MAIN%" %* +goto finally + +:err +echo JAVA_HOME environment variable must be set! +pause + +:finally + +ENDLOCAL http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 5a82d64..e6581ea 100644 --- a/build.xml +++ b/build.xml @@ -1214,7 +1214,7 @@ <jvmarg value="-Xss256k"/> <jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/> <jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/> - <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/> + <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/> <jvmarg value="-Dcassandra.test.sstableformatdevelopment=true"/> <jvmarg value="-Dcassandra.testtag=@{testtag}"/> <jvmarg value="-Dcassandra.keepBriefBrief=${cassandra.keepBriefBrief}" /> @@ -1882,7 +1882,7 @@ <option name="MAIN_CLASS_NAME" value="" /> <option name="METHOD_NAME" value="" /> <option name="TEST_OBJECT" value="class" /> - <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -ea" /> + <option name="VM_PARAMETERS" value="-Dcassandra.debugrefcount=true -Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Dcorrupt-sstable-root=$PROJECT_DIR$/test/data/corrupt-sstables -Dmigration-sstable-root=${test.data}/migration-sstables -ea" /> <option name="PARAMETERS" value="" /> <option name="WORKING_DIRECTORY" value="" /> <option name="ENV_VARIABLES" /> http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 2a2062e..fe6752f 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -154,7 +154,7 @@ public class Config public volatile Integer stream_throughput_outbound_megabits_per_sec = 200; public volatile Integer inter_dc_stream_throughput_outbound_megabits_per_sec = 0; - public String[] data_file_directories; + public String[] data_file_directories = new String[0]; public String saved_caches_directory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 01b1633..a25af65 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -490,7 +490,7 @@ public class DatabaseDescriptor throw new ConfigurationException("saved_caches_directory is missing and -Dcassandra.storagedir is not set", false); conf.saved_caches_directory += File.separator + "saved_caches"; } - if (conf.data_file_directories == null) + if (conf.data_file_directories == null || conf.data_file_directories.length == 0) { String defaultDataDir = System.getProperty("cassandra.storagedir", null); if (defaultDataDir == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index d3ad4e6..8d14120 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -60,18 +60,14 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.*; -import org.apache.cassandra.io.sstable.metadata.CompactionMetadata; -import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.TableMetrics.Sampler; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.streaming.StreamLockfile; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.*; import org.apache.cassandra.utils.TopKSampler.SamplerResult; @@ -522,45 +518,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { Directories directories = new Directories(metadata); - // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) + // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) clearEphemeralSnapshots(directories); - // remove any left-behind SSTables from failed/stalled streaming - FileFilter filter = new FileFilter() - { - public boolean accept(File pathname) - { - return pathname.getPath().endsWith(StreamLockfile.FILE_EXT); - } - }; - for (File dir : directories.getCFDirectories()) - { - File[] lockfiles = dir.listFiles(filter); - // lock files can be null if I/O error happens - if (lockfiles == null || lockfiles.length == 0) - continue; - logger.info("Removing SSTables from failed streaming session. Found {} files to cleanup.", lockfiles.length); - - for (File lockfile : lockfiles) - { - StreamLockfile streamLockfile = new StreamLockfile(lockfile); - streamLockfile.cleanup(); - streamLockfile.delete(); - } - } - - logger.debug("Removing compacted SSTable files from {} (see http://wiki.apache.org/cassandra/MemtableSSTable)", metadata.cfName); + logger.debug("Removing temporary or obsoleted files from unfinished operations for table", metadata.cfName); + LifecycleTransaction.removeUnfinishedLeftovers(metadata); + logger.debug("Further extra check for orphan sstable files for {}", metadata.cfName); for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) { Descriptor desc = sstableFiles.getKey(); Set<Component> components = sstableFiles.getValue(); - if (desc.type.isTemporary) - { - SSTable.delete(desc, components); - continue; - } + for (File tmpFile : desc.getTemporaryFiles()) + tmpFile.delete(); File dataFile = new File(desc.filenameFor(Component.DATA)); if (components.contains(Component.DATA) && dataFile.length() > 0) @@ -571,7 +542,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean logger.warn("Removing orphans for {}: {}", desc, components); for (Component component : components) { - FileUtils.deleteWithConfirm(desc.filenameFor(component)); + File file = new File(desc.filenameFor(component)); + if (file.exists()) + FileUtils.deleteWithConfirm(desc.filenameFor(component)); } } @@ -600,91 +573,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - /** - * Replacing compacted sstables is atomic as far as observers of Tracker are concerned, but not on the - * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then - * their ancestors are removed. - * - * If an unclean shutdown happens at the right time, we can thus end up with both the new ones and their - * ancestors "live" in the system. This is harmless for normal data, but for counters it can cause overcounts. - * - * To prevent this, we record sstables being compacted in the system keyspace. If we find unfinished - * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple - * sstables from any given ancestor). - */ - public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map<Integer, UUID> unfinishedCompactions) - { - Directories directories = new Directories(metadata); - - Set<Integer> allGenerations = new HashSet<>(); - for (Descriptor desc : directories.sstableLister().list().keySet()) - allGenerations.add(desc.generation); - - // sanity-check unfinishedCompactions - Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet(); - if (!allGenerations.containsAll(unfinishedGenerations)) - { - HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations); - missingGenerations.removeAll(allGenerations); - logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}", - metadata.ksName, metadata.cfName, missingGenerations); - } - - // remove new sstables from compactions that didn't complete, and compute - // set of ancestors that shouldn't exist anymore - Set<Integer> completedAncestors = new HashSet<>(); - for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().skipTemporary(true).list().entrySet()) - { - Descriptor desc = sstableFiles.getKey(); - - Set<Integer> ancestors; - try - { - CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION); - ancestors = compactionMetadata.ancestors; - } - catch (IOException e) - { - throw new FSReadError(e, desc.filenameFor(Component.STATS)); - } - catch (NullPointerException e) - { - throw new FSReadError(e, "Failed to remove unfinished compaction leftovers (file: " + desc.filenameFor(Component.STATS) + "). See log for details."); - } - - if (!ancestors.isEmpty() - && unfinishedGenerations.containsAll(ancestors) - && allGenerations.containsAll(ancestors)) - { - // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one - UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next()); - assert compactionTaskID != null; - logger.debug("Going to delete unfinished compaction product {}", desc); - SSTable.delete(desc, sstableFiles.getValue()); - SystemKeyspace.finishCompaction(compactionTaskID); - } - else - { - completedAncestors.addAll(ancestors); - } - } - - // remove old sstables from compactions that did complete - for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) - { - Descriptor desc = sstableFiles.getKey(); - if (completedAncestors.contains(desc.generation)) - { - // if any of the ancestors were participating in a compaction, finish that compaction - logger.debug("Going to delete leftover compaction ancestor {}", desc); - SSTable.delete(desc, sstableFiles.getValue()); - UUID compactionTaskID = unfinishedCompactions.get(desc.generation); - if (compactionTaskID != null) - SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation)); - } - } - } - // must be called after all sstables are loaded since row cache merges all row versions public void initRowCache() { @@ -750,8 +638,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (currentDescriptors.contains(descriptor)) continue; // old (initialized) SSTable found, skipping - if (descriptor.type.isTemporary) // in the process of being written - continue; if (!descriptor.isCompatible()) throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", @@ -780,7 +666,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean descriptor.ksname, descriptor.cfname, fileIndexGenerator.incrementAndGet(), - Descriptor.Type.FINAL, descriptor.formatType); } while (new File(newDescriptor.filenameFor(Component.DATA)).exists()); @@ -851,24 +736,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return name; } - public String getTempSSTablePath(File directory) + public String getSSTablePath(File directory) { - return getTempSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat()); + return getSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat()); } - public String getTempSSTablePath(File directory, SSTableFormat.Type format) + public String getSSTablePath(File directory, SSTableFormat.Type format) { - return getTempSSTablePath(directory, format.info.getLatestVersion(), format); + return getSSTablePath(directory, format.info.getLatestVersion(), format); } - private String getTempSSTablePath(File directory, Version version, SSTableFormat.Type format) + private String getSSTablePath(File directory, Version version, SSTableFormat.Type format) { Descriptor desc = new Descriptor(version, directory, keyspace.getName(), name, fileIndexGenerator.incrementAndGet(), - Descriptor.Type.TEMP, format); return desc.filenameFor(Component.DATA); } @@ -1883,11 +1767,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return directories.getSnapshotDetails(); } - public boolean hasUnreclaimedSpace() - { - return metric.liveDiskSpaceUsed.getCount() < metric.totalDiskSpaceUsed.getCount(); - } - /** * @return the cached partition for @param key if it is already present in the cache. * Not that this will not readAndCache the parition if it is not present, nor http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 8b61c68..bede4c4 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -44,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.*; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; @@ -91,6 +92,7 @@ public class Directories public static final String BACKUPS_SUBDIR = "backups"; public static final String SNAPSHOT_SUBDIR = "snapshots"; + public static final String TRANSACTIONS_SUBDIR = "transactions"; public static final String SECONDARY_INDEX_NAME_SEPARATOR = "."; public static final DataDirectory[] dataDirectories; @@ -466,6 +468,35 @@ public class Directories } } + public static File getTransactionsDirectory(File folder) + { + return getOrCreate(folder, TRANSACTIONS_SUBDIR); + } + + public List<File> getExistingDirectories(String subFolder) + { + List<File> ret = new ArrayList<>(); + for (File dir : dataPaths) + { + File subDir = getExistingDirectory(dir, subFolder); + if (subDir != null) + ret.add(subDir); + + } + return ret; + } + + public static File getExistingDirectory(File folder, String subFolder) + { + File subDir = new File(folder, join(subFolder)); + if (subDir.exists()) + { + assert(subDir.isDirectory()); + return subDir; + } + return null; + } + public SSTableLister sstableLister() { return new SSTableLister(); @@ -521,6 +552,7 @@ public class Directories public class SSTableLister { private boolean skipTemporary; + private boolean onlyTemporary; private boolean includeBackups; private boolean onlyBackups; private int nbFiles; @@ -536,6 +568,14 @@ public class Directories return this; } + public SSTableLister onlyTemporary(boolean b) + { + if (filtered) + throw new IllegalStateException("list() has already been called"); + onlyTemporary = b; + return this; + } + public SSTableLister includeBackups(boolean b) { if (filtered) @@ -593,21 +633,25 @@ public class Directories if (snapshotName != null) { - getSnapshotDirectory(location, snapshotName).listFiles(getFilter()); + getSnapshotDirectory(location, snapshotName).listFiles(getFilter(location)); continue; } if (!onlyBackups) - location.listFiles(getFilter()); + location.listFiles(getFilter(location)); if (includeBackups) - getBackupsDirectory(location).listFiles(getFilter()); + getBackupsDirectory(location).listFiles(getFilter(location)); } filtered = true; } - private FileFilter getFilter() + private FileFilter getFilter(File location) { + final Set<File> temporaryFiles = skipTemporary || onlyTemporary + ? LifecycleTransaction.getTemporaryFiles(metadata, location) + : Collections.<File>emptySet(); + return new FileFilter() { // This function always return false since accepts adds to the components map @@ -624,7 +668,10 @@ public class Directories if (!pair.left.ksname.equals(metadata.ksName) || !pair.left.cfname.equals(metadata.cfName)) return false; - if (skipTemporary && pair.left.type.isTemporary) + if (skipTemporary && temporaryFiles.contains(file)) + return false; + + if (onlyTemporary && !temporaryFiles.contains(file)) return false; Set<Component> previous = components.get(pair.left); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 71e03d5..ecaf063 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -28,6 +28,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.SSTableTxnWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -367,8 +370,7 @@ public class Memtable implements Comparable<Memtable> logger.info("Writing {}", Memtable.this.toString()); SSTableReader ssTable; - // errors when creating the writer that may leave empty temp files. - try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get())) + try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get())) { boolean trackContention = logger.isDebugEnabled(); int heavilyContendedRowCount = 0; @@ -400,10 +402,10 @@ public class Memtable implements Comparable<Memtable> { logger.info(String.format("Completed flushing %s (%s) for commitlog position %s", writer.getFilename(), - FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()), + FBUtilities.prettyPrintMemory(writer.getFilePointer()), context)); - // temp sstables should contain non-repaired data. + // sstables should contain non-repaired data. ssTable = writer.finish(true); } else @@ -421,18 +423,23 @@ public class Memtable implements Comparable<Memtable> } } - public SSTableWriter createFlushWriter(String filename, + public SSTableTxnWriter createFlushWriter(String filename, PartitionColumns columns, EncodingStats stats) { + // we operate "offline" here, as we expose the resulting reader consciously when done + // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction) + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata); MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context); - return SSTableWriter.create(Descriptor.fromFilename(filename), - (long)partitions.size(), - ActiveRepairService.UNREPAIRED_SSTABLE, - cfs.metadata, - cfs.partitioner, - sstableMetadataCollector, - new SerializationHeader(cfs.metadata, columns, stats)); + return new SSTableTxnWriter(txn, + SSTableWriter.create(Descriptor.fromFilename(filename), + (long)partitions.size(), + ActiveRepairService.UNREPAIRED_SSTABLE, + cfs.metadata, + cfs.partitioner, + sstableMetadataCollector, + new SerializationHeader(cfs.metadata, columns, stats), + txn)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 0957af6..6a4a847 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; -import com.google.common.base.Function; import com.google.common.collect.*; import com.google.common.io.ByteStreams; @@ -47,9 +46,10 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.metrics.RestorableMeter; @@ -97,7 +97,6 @@ public final class SystemKeyspace public static final String PEERS = "peers"; public static final String PEER_EVENTS = "peer_events"; public static final String RANGE_XFERS = "range_xfers"; - public static final String COMPACTIONS_IN_PROGRESS = "compactions_in_progress"; public static final String COMPACTION_HISTORY = "compaction_history"; public static final String SSTABLE_ACTIVITY = "sstable_activity"; public static final String SIZE_ESTIMATES = "size_estimates"; @@ -216,16 +215,6 @@ public final class SystemKeyspace + "requested_at timestamp," + "PRIMARY KEY ((token_bytes)))"); - private static final CFMetaData CompactionsInProgress = - compile(COMPACTIONS_IN_PROGRESS, - "unfinished compactions", - "CREATE TABLE %s (" - + "id uuid," - + "columnfamily_name text," - + "inputs set<int>," - + "keyspace_name text," - + "PRIMARY KEY ((id)))"); - private static final CFMetaData CompactionHistory = compile(COMPACTION_HISTORY, "week-long compaction history", @@ -408,7 +397,6 @@ public final class SystemKeyspace Peers, PeerEvents, RangeXfers, - CompactionsInProgress, CompactionHistory, SSTableActivity, SizeEstimates, @@ -485,81 +473,6 @@ public final class SystemKeyspace FBUtilities.getLocalAddress()); } - /** - * Write compaction log, except columfamilies under system keyspace. - * - * @param cfs cfs to compact - * @param toCompact sstables to compact - * @return compaction task id or null if cfs is under system keyspace - */ - public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact) - { - if (Schema.isSystemKeyspace(cfs.keyspace.getName())) - return null; - - UUID compactionId = UUIDGen.getTimeUUID(); - Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>() - { - public Integer apply(SSTableReader sstable) - { - return sstable.descriptor.generation; - } - }); - String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); - forceBlockingFlush(COMPACTIONS_IN_PROGRESS); - return compactionId; - } - - /** - * Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need - * to complete successfully for this to be called. - * @param taskId what was returned from {@code startCompaction} - */ - public static void finishCompaction(UUID taskId) - { - assert taskId != null; - - executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId); - forceBlockingFlush(COMPACTIONS_IN_PROGRESS); - } - - /** - * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the - * task ID of the compaction they were participating in. - */ - public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions() - { - String req = "SELECT * FROM system.%s"; - UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS)); - - Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>(); - for (UntypedResultSet.Row row : resultSet) - { - String keyspace = row.getString("keyspace_name"); - String columnfamily = row.getString("columnfamily_name"); - Set<Integer> inputs = row.getSet("inputs", Int32Type.instance); - UUID taskID = row.getUUID("id"); - - Pair<String, String> kscf = Pair.create(keyspace, columnfamily); - Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf); - if (generationToTaskID == null) - generationToTaskID = new HashMap<>(inputs.size()); - - for (Integer generation : inputs) - generationToTaskID.put(generation, taskID); - - unfinishedCompactions.put(kscf, generationToTaskID); - } - return unfinishedCompactions; - } - - public static void discardCompactionsInProgress() - { - ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS); - compactionLog.truncateBlocking(); - } - public static void updateCompactionHistory(String ksname, String cfname, long compactedAt, @@ -1227,7 +1140,7 @@ public final class SystemKeyspace * * @throws IOException */ - public static void snapshotOnVersionChange() throws IOException + public static boolean snapshotOnVersionChange() throws IOException { String previous = getPreviousVersionString(); String next = FBUtilities.getReleaseVersionString(); @@ -1242,7 +1155,10 @@ public final class SystemKeyspace next)); Keyspace systemKs = Keyspace.open(SystemKeyspace.NAME); systemKs.snapshot(snapshotName, null); + return true; } + + return false; } /** @@ -1282,6 +1198,36 @@ public final class SystemKeyspace return result.one().getString("release_version"); } + /** + * Check data directories for old files that can be removed when migrating from 2.2 to 3.0, + * these checks can be removed in 4.0, see CASSANDRA-7066 + */ + public static void migrateDataDirs() + { + Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()); + for (String dataDir : dirs) + { + logger.debug("Checking directory {} for old files", dataDir); + File dir = new File(dataDir); + assert dir.exists() : dir + " should have been created by startup checks"; + + for (File ksdir : dir.listFiles((d, n) -> d.isDirectory())) + { + for (File cfdir : ksdir.listFiles((d, n) -> d.isDirectory())) + { + if (Descriptor.isLegacyFile(cfdir.getName())) + { + FileUtils.deleteRecursive(cfdir); + } + else + { + FileUtils.delete(cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(n))); + } + } + } + } + } + private static ByteBuffer rangeToBytes(Range<Token> range) { try (DataOutputBuffer out = new DataOutputBuffer()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 303de15..df3bc4e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -63,7 +63,7 @@ public class CompactionController implements AutoCloseable refreshOverlaps(); } - void maybeRefreshOverlaps() + public void maybeRefreshOverlaps() { for (SSTableReader reader : overlappingSSTables) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 6cf2e18..616c310 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -823,7 +823,7 @@ public class CompactionManager implements CompactionManagerMBean CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) { - writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable)); + writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, txn)); while (ci.hasNext()) { @@ -948,24 +948,27 @@ public class CompactionManager implements CompactionManagerMBean File compactionFileLocation, int expectedBloomFilterSize, long repairedAt, - SSTableReader sstable) + SSTableReader sstable, + LifecycleTransaction txn) { FileUtils.createDirectory(compactionFileLocation); return SSTableWriter.create(cfs.metadata, - Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)), + Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)), expectedBloomFilterSize, repairedAt, sstable.getSSTableLevel(), cfs.partitioner, - sstable.header); + sstable.header, + txn); } public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs, File compactionFileLocation, int expectedBloomFilterSize, long repairedAt, - Collection<SSTableReader> sstables) + Collection<SSTableReader> sstables, + LifecycleTransaction txn) { FileUtils.createDirectory(compactionFileLocation); int minLevel = Integer.MAX_VALUE; @@ -983,13 +986,14 @@ public class CompactionManager implements CompactionManagerMBean break; } } - return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)), + return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)), (long) expectedBloomFilterSize, repairedAt, cfs.metadata, cfs.partitioner, new MetadataCollector(sstables, cfs.metadata.comparator, minLevel), - SerializationHeader.make(cfs.metadata, sstables)); + SerializationHeader.make(cfs.metadata, sstables), + txn); } @@ -1198,8 +1202,8 @@ public class CompactionManager implements CompactionManagerMBean { int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); - repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); - unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); + repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet, anticompactionGroup)); + unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet, anticompactionGroup)); while (ci.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 6335834..7897a1a 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.compaction; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -46,7 +45,6 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Refs; public class CompactionTask extends AbstractCompactionTask @@ -128,7 +126,7 @@ public class CompactionTask extends AbstractCompactionTask } }); - UUID taskId = SystemKeyspace.startCompaction(cfs, transaction.originals()); + UUID taskId = transaction.opId(); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting @@ -139,8 +137,8 @@ public class CompactionTask extends AbstractCompactionTask ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel())); } ssTableLoggerMsg.append("]"); - String taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); - logger.info("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); + + logger.info("Compacting ({}) {}", taskId, ssTableLoggerMsg); long start = System.nanoTime(); long totalKeysWritten = 0; @@ -186,16 +184,11 @@ public class CompactionTask extends AbstractCompactionTask } } - // don't replace old sstables yet, as we need to mark the compaction finished in the system table + // point of no return newSStables = writer.finish(); } finally { - // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones - // (in replaceCompactedSSTables) - if (taskId != null) - SystemKeyspace.finishCompaction(taskId); - if (collector != null) collector.finishCompaction(ci); @@ -217,7 +210,7 @@ public class CompactionTask extends AbstractCompactionTask long totalSourceRows = 0; String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize); logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); + taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); @@ -227,10 +220,11 @@ public class CompactionTask extends AbstractCompactionTask } @Override - public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables) + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + LifecycleTransaction transaction, + Set<SSTableReader> nonExpiredSSTables) { - return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, compactionType); - + return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline); } public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index 1c3b686..d3d56ac 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -41,11 +41,13 @@ public class LeveledCompactionTask extends CompactionTask } @Override - public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables) { if (majorCompaction) - return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, compactionType); - return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType); + return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false); + return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index a14f13f..5b6ce05 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -32,13 +32,27 @@ public enum OperationType TOMBSTONE_COMPACTION("Tombstone Compaction"), UNKNOWN("Unknown compaction type"), ANTICOMPACTION("Anticompaction after repair"), - VERIFY("Verify"); + VERIFY("Verify"), + FLUSH("Flush"), + STREAM("Stream"), + WRITE("Write"); - private final String type; + public final String type; + public final String fileName; OperationType(String type) { this.type = type; + this.fileName = type.toLowerCase().replace(" ", ""); + } + + public static OperationType fromFileName(String fileName) + { + for (OperationType opType : OperationType.values()) + if (opType.fileName.equals(fileName)) + return opType; + + throw new IllegalArgumentException("Invalid fileName for operation type: " + fileName); } public String toString() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java index e9a4f05..8f382ea 100644 --- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java +++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java @@ -74,9 +74,11 @@ public class SSTableSplitter { } @Override - public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables) { - return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType); + return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 94f3af7..c853157 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -59,6 +59,7 @@ public class Scrubber implements Closeable private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; private final boolean isOffline; + private final boolean keepOriginals; private SSTableReader newSstable; private SSTableReader newInOrderSstable; @@ -85,11 +86,17 @@ public class Scrubber implements Closeable public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException { - this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData); + this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData, false); } @SuppressWarnings("resource") - public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException + public Scrubber(ColumnFamilyStore cfs, + LifecycleTransaction transaction, + boolean skipCorrupted, + OutputHandler outputHandler, + boolean isOffline, + boolean checkData, + boolean keepOriginals) throws IOException { this.cfs = cfs; this.transaction = transaction; @@ -97,6 +104,7 @@ public class Scrubber implements Closeable this.outputHandler = outputHandler; this.skipCorrupted = skipCorrupted; this.isOffline = isOffline; + this.keepOriginals = keepOriginals; this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata, sstable.descriptor.version, sstable.header); @@ -149,7 +157,7 @@ public class Scrubber implements Closeable { outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); int nowInSec = FBUtilities.nowInSeconds(); - try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline)) + try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline).keepOriginals(isOffline)) { nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null; if (indexAvailable()) @@ -159,7 +167,7 @@ public class Scrubber implements Closeable assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; } - writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable)); + writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, transaction)); DecoratedKey prevKey = null; @@ -291,7 +299,7 @@ public class Scrubber implements Closeable { // out of order rows, but no bad rows found - we can keep our repairedAt time long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt; - try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);) + try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable, transaction)) { for (Partition partition : outOfOrder) inOrderWriter.append(partition.unfilteredIterator()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 74a9757..0ece341 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -347,9 +347,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy } @Override - public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables) { - return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables, compactionType); + return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index e3764c8..be0dd2a 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -43,7 +43,6 @@ public class Upgrader private final LifecycleTransaction transaction; private final File directory; - private final OperationType compactionType = OperationType.UPGRADE_SSTABLES; private final CompactionController controller; private final CompactionStrategyManager strategyManager; private final long estimatedRows; @@ -80,23 +79,23 @@ public class Upgrader sstableMetadataCollector.addAncestor(i); } sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); - return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), + return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(directory)), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector, - SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable))); + SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)), + transaction); } - public void upgrade() + public void upgrade(boolean keepOriginals) { outputHandler.output("Upgrading " + sstable); - int nowInSec = FBUtilities.nowInSeconds(); - try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true); + try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true).keepOriginals(keepOriginals); AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals()); - CompactionIterator iter = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) + CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); while (iter.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index 610592f..f8c73d3 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -42,16 +42,22 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa protected final long estimatedTotalKeys; protected final long maxAge; protected final long minRepairedAt; + + protected final LifecycleTransaction txn; protected final SSTableRewriter sstableWriter; - public CompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline) + public CompactionAwareWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables, + boolean offline) { this.cfs = cfs; this.nonExpiredSSTables = nonExpiredSSTables; this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables); this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables); this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables); - this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline); + this.txn = txn; + this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(offline); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 8fc7bec..cdacddc 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -27,14 +27,12 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; - /** * The default compaction writer - creates one output file in L0 */ @@ -43,20 +41,21 @@ public class DefaultCompactionWriter extends CompactionAwareWriter protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class); @SuppressWarnings("resource") - public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType) + public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline) { super(cfs, txn, nonExpiredSSTables, offline); logger.debug("Expected bloom filter size : {}", estimatedTotalKeys); - long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType); + long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), estimatedTotalKeys, minRepairedAt, cfs.metadata, cfs.partitioner, new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); + SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + txn); sstableWriter.switchWriter(writer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 5328fa5..ad58967 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -28,7 +28,6 @@ import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.LeveledManifest; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -49,12 +48,12 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter private final boolean skipAncestors; @SuppressWarnings("resource") - public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType) + public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline) { super(cfs, txn, nonExpiredSSTables, offline); this.maxSSTableSize = maxSSTableSize; this.allSSTables = txn.originals(); - expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType)); + expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())); long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize); long keysPerSSTable = estimatedTotalKeys / estimatedSSTables; File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); @@ -64,13 +63,14 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter logger.warn("Many sstables involved in compaction, skipping storing ancestor information to avoid running out of memory"); @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), keysPerSSTable, minRepairedAt, cfs.metadata, cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); + SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + txn); sstableWriter.switchWriter(writer); } @@ -92,13 +92,14 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1)); File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), averageEstimatedKeysPerSSTable, minRepairedAt, cfs.metadata, cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); + SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + txn); sstableWriter.switchWriter(writer); partitionsWritten = 0; sstablesWritten++; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 4832fd5..9902357 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -24,7 +24,6 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -41,25 +40,26 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter private final Set<SSTableReader> allSSTables; @SuppressWarnings("resource") - public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType) + public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline) { super(cfs, txn, nonExpiredSSTables, offline); this.allSSTables = txn.originals(); this.level = level; this.maxSSTableSize = maxSSTableSize; - long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType); + long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); expectedWriteSize = Math.min(maxSSTableSize, totalSize); estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables); estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize); File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), estimatedTotalKeys / estimatedSSTables, minRepairedAt, cfs.metadata, cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, level), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); + SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + txn); sstableWriter.switchWriter(writer); } @@ -71,13 +71,14 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter { File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), - estimatedTotalKeys / estimatedSSTables, - minRepairedAt, - cfs.metadata, - cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, level), - SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)), + estimatedTotalKeys / estimatedSSTables, + minRepairedAt, + cfs.metadata, + cfs.partitioner, + new MetadataCollector(allSSTables, cfs.metadata.comparator, level), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + txn); sstableWriter.switchWriter(writer); }
