http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java b/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java deleted file mode 100644 index 821f58c..0000000 --- a/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java +++ /dev/null @@ -1,786 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.lifecycle; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang3.StringUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.cassandra.utils.Throwables.merge; - -import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.CLibrary; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.concurrent.Blocker; -import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.concurrent.RefCounted; -import org.apache.cassandra.utils.concurrent.Transactional; - -/** - * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction, - * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent - * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also - * *requires* that the prepareToCommit() phase only take actions that can be rolled back. - * - * A class that tracks sstable files involved in a transaction across sstables: - * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails. - * - * Two log files, NEW and OLD, contain new and old sstable files respectively. The log files also track each - * other by referencing each others path in the contents. - * - * If the transaction finishes successfully: - * - the OLD transaction file is deleted along with its contents, this includes the NEW transaction file. - * Before deleting we must let the SSTableTidier instances run first for any old readers that are being obsoleted - * (mark as compacted) by the transaction, see LifecycleTransaction - * - * If the transaction is aborted: - * - the NEW transaction file and its contents are deleted, this includes the OLD transaction file - * - * On start-up: - * - If we find a NEW transaction file, it means the transaction did not complete and we delete the NEW file and its contents - * - If we find an OLD transaction file but not a NEW file, it means the transaction must have completed and so we delete - * all the contents of the OLD file, if they still exist, and the OLD file itself. - * - * See CASSANDRA-7066 for full details. - */ -public class TransactionLogs extends Transactional.AbstractTransactional implements Transactional -{ - private static final Logger logger = LoggerFactory.getLogger(TransactionLogs.class); - - /** - * A single transaction log file, either NEW or OLD. - */ - final static class TransactionFile - { - static String EXT = ".log"; - static char SEP = '_'; - static String REGEX_STR = String.format("^(.*)_(.*)_(%s|%s)%s$", Type.NEW.txt, Type.OLD.txt, EXT); - static Pattern REGEX = Pattern.compile(REGEX_STR); //(opname)_(id)_(new|old).data - - public enum Type - { - NEW (0, "new"), - OLD (1, "old"); - - public final int idx; - public final String txt; - - Type(int idx, String txt) - { - this.idx = idx; - this.txt = txt; - } - }; - - public final Type type; - public final File file; - public final TransactionData parent; - public final Set<String> lines = new HashSet<>(); - - public TransactionFile(Type type, TransactionData parent) - { - this.type = type; - this.file = new File(parent.getFileName(type)); - this.parent = parent; - - if (exists()) - lines.addAll(FileUtils.readLines(file)); - } - - public boolean add(SSTable table) - { - return add(table.descriptor.baseFilename()); - } - - private boolean add(String path) - { - String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), path); - if (lines.contains(relativePath)) - return false; - - lines.add(relativePath); - FileUtils.append(file, relativePath); - return true; - } - - public void remove(SSTable table) - { - String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), table.descriptor.baseFilename()); - assert lines.contains(relativePath) : String.format("%s is not tracked by %s", relativePath, file); - - lines.remove(relativePath); - delete(relativePath); - } - - public boolean contains(SSTable table) - { - String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), table.descriptor.baseFilename()); - return lines.contains(relativePath); - } - - private void deleteContents() - { - deleteOpposite(); - - // we sync the parent file descriptor between opposite log deletion and - // contents deletion to ensure there is a happens before edge between them - parent.sync(); - - lines.forEach(line -> delete(line)); - lines.clear(); - } - - private void deleteOpposite() - { - Type oppositeType = type == Type.NEW ? Type.OLD : Type.NEW; - String oppositeFile = FileUtils.getRelativePath(parent.getParentFolder(), parent.getFileName(oppositeType)); - assert lines.contains(oppositeFile) : String.format("Could not find %s amongst lines", oppositeFile); - - delete(oppositeFile); - lines.remove(oppositeFile); - } - - private void delete(String relativePath) - { - getTrackedFiles(relativePath).forEach(file -> TransactionLogs.delete(file)); - } - - public Set<File> getTrackedFiles() - { - Set<File> ret = new HashSet<>(); - FileUtils.readLines(file).forEach(line -> ret.addAll(getTrackedFiles(line))); - ret.add(file); - return ret; - } - - private List<File> getTrackedFiles(String relativePath) - { - List<File> ret = new ArrayList<>(); - File file = new File(StringUtils.join(parent.getParentFolder(), File.separator, relativePath)); - if (file.exists()) - ret.add(file); - else - ret.addAll(Arrays.asList(new File(parent.getParentFolder()).listFiles((dir, name) -> { - return name.startsWith(relativePath); - }))); - - return ret; - } - - public void delete(boolean deleteContents) - { - assert file.exists() : String.format("Expected %s to exists", file); - - if (deleteContents) - deleteContents(); - - // we sync the parent file descriptor between contents and log deletion - // to ensure there is a happens before edge between them - parent.sync(); - - TransactionLogs.delete(file); - } - - public boolean exists() - { - return file.exists(); - } - } - - /** - * We split the transaction data from the behavior because we need - * to reconstruct any left-overs and clean them up, as well as work - * out which files are temporary. So for these cases we don't want the full - * transactional behavior, plus it's handy for the TransactionTidier. - */ - final static class TransactionData implements AutoCloseable - { - private final OperationType opType; - private final UUID id; - private final File folder; - private final TransactionFile[] files; - private int folderDescriptor; - private boolean succeeded; - - static TransactionData make(File logFile) - { - Matcher matcher = TransactionFile.REGEX.matcher(logFile.getName()); - assert matcher.matches(); - - OperationType operationType = OperationType.fromFileName(matcher.group(1)); - UUID id = UUID.fromString(matcher.group(2)); - - return new TransactionData(operationType, logFile.getParentFile(), id); - } - - TransactionData(OperationType opType, File folder, UUID id) - { - this.opType = opType; - this.id = id; - this.folder = folder; - this.files = new TransactionFile[TransactionFile.Type.values().length]; - for (TransactionFile.Type t : TransactionFile.Type.values()) - this.files[t.idx] = new TransactionFile(t, this); - - this.folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath()); - this.succeeded = !newLog().exists() && oldLog().exists(); - } - - public void succeeded(boolean succeeded) - { - this.succeeded = succeeded; - } - - public void close() - { - if (folderDescriptor > 0) - { - CLibrary.tryCloseFD(folderDescriptor); - folderDescriptor = -1; - } - } - - void crossReference() - { - newLog().add(oldLog().file.getPath()); - oldLog().add(newLog().file.getPath()); - } - - void sync() - { - if (folderDescriptor > 0) - CLibrary.trySync(folderDescriptor); - } - - TransactionFile newLog() - { - return files[TransactionFile.Type.NEW.idx]; - } - - TransactionFile oldLog() - { - return files[TransactionFile.Type.OLD.idx]; - } - - OperationType getType() - { - return opType; - } - - UUID getId() - { - return id; - } - - Throwable removeUnfinishedLeftovers(Throwable accumulate) - { - try - { - if (succeeded) - oldLog().delete(true); - else - newLog().delete(true); - } - catch (Throwable t) - { - accumulate = merge(accumulate, t); - } - - return accumulate; - } - - Set<File> getTemporaryFiles() - { - sync(); - - if (newLog().exists()) - return newLog().getTrackedFiles(); - else - return oldLog().getTrackedFiles(); - } - - String getFileName(TransactionFile.Type type) - { - String fileName = StringUtils.join(opType.fileName, - TransactionFile.SEP, - id.toString(), - TransactionFile.SEP, - type.txt, - TransactionFile.EXT); - return StringUtils.join(folder, File.separator, fileName); - } - - String getParentFolder() - { - return folder.getParent(); - } - - static boolean isLogFile(String name) - { - return TransactionFile.REGEX.matcher(name).matches(); - } - } - - private final Tracker tracker; - private final TransactionData data; - private final Ref<TransactionLogs> selfRef; - // Deleting sstables is tricky because the mmapping might not have been finalized yet, - // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs). - // Additionally, we need to make sure to delete the data file first, so on restart the others - // will be recognized as GCable. - private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>(); - private static final Blocker blocker = new Blocker(); - - TransactionLogs(OperationType opType, CFMetaData metadata) - { - this(opType, metadata, null); - } - - TransactionLogs(OperationType opType, CFMetaData metadata, Tracker tracker) - { - this(opType, new Directories(metadata), tracker); - } - - TransactionLogs(OperationType opType, Directories directories, Tracker tracker) - { - this(opType, directories.getDirectoryForNewSSTables(), tracker); - } - - TransactionLogs(OperationType opType, File folder, Tracker tracker) - { - this.tracker = tracker; - this.data = new TransactionData(opType, - Directories.getTransactionsDirectory(folder), - UUIDGen.getTimeUUID()); - this.selfRef = new Ref<>(this, new TransactionTidier(data)); - - data.crossReference(); - if (logger.isDebugEnabled()) - logger.debug("Created transaction logs with id {}", data.id); - } - - /** - * Track a reader as new. - **/ - void trackNew(SSTable table) - { - if (!data.newLog().add(table)) - throw new IllegalStateException(table + " is already tracked as new"); - - data.newLog().add(table); - } - - /** - * Stop tracking a reader as new. - */ - void untrackNew(SSTable table) - { - data.newLog().remove(table); - } - - /** - * Schedule a reader for deletion as soon as it is fully unreferenced and the transaction - * has been committed. - */ - SSTableTidier obsoleted(SSTableReader reader) - { - if (data.newLog().contains(reader)) - { - if (data.oldLog().contains(reader)) - throw new IllegalArgumentException(); - - return new SSTableTidier(reader, true, this); - } - - if (!data.oldLog().add(reader)) - throw new IllegalStateException(); - - if (tracker != null) - tracker.notifyDeleting(reader); - - return new SSTableTidier(reader, false, this); - } - - OperationType getType() - { - return data.getType(); - } - - UUID getId() - { - return data.getId(); - } - - @VisibleForTesting - String getDataFolder() - { - return data.getParentFolder(); - } - - @VisibleForTesting - String getLogsFolder() - { - return StringUtils.join(getDataFolder(), File.separator, Directories.TRANSACTIONS_SUBDIR); - } - - @VisibleForTesting - TransactionData getData() - { - return data; - } - - private static void delete(File file) - { - try - { - if (logger.isDebugEnabled()) - logger.debug("Deleting {}", file); - - Files.delete(file.toPath()); - } - catch (NoSuchFileException e) - { - logger.error("Unable to delete {} as it does not exist", file); - } - catch (IOException e) - { - logger.error("Unable to delete {}", file, e); - throw new RuntimeException(e); - } - } - - /** - * The transaction tidier. - * - * When the transaction reference is fully released we try to delete all the obsolete files - * depending on the transaction result. - */ - private static class TransactionTidier implements RefCounted.Tidy, Runnable - { - private final TransactionData data; - - public TransactionTidier(TransactionData data) - { - this.data = data; - } - - public void tidy() throws Exception - { - run(); - } - - public String name() - { - return data.id.toString(); - } - - public void run() - { - if (logger.isDebugEnabled()) - logger.debug("Removing files for transaction {}", name()); - - Throwable err = data.removeUnfinishedLeftovers(null); - - if (err != null) - { - logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err); - failedDeletions.add(this); - } - else - { - if (logger.isDebugEnabled()) - logger.debug("Closing file transaction {}", name()); - data.close(); - } - } - } - - static class Obsoletion - { - final SSTableReader reader; - final SSTableTidier tidier; - - public Obsoletion(SSTableReader reader, SSTableTidier tidier) - { - this.reader = reader; - this.tidier = tidier; - } - } - - /** - * The SSTableReader tidier. When a reader is fully released and no longer referenced - * by any one, we run this. It keeps a reference to the parent transaction and releases - * it when done, so that the final transaction cleanup can run when all obsolete readers - * are released. - */ - public static class SSTableTidier implements Runnable - { - // must not retain a reference to the SSTableReader, else leak detection cannot kick in - private final Descriptor desc; - private final long sizeOnDisk; - private final Tracker tracker; - private final boolean wasNew; - private final Ref<TransactionLogs> parentRef; - - public SSTableTidier(SSTableReader referent, boolean wasNew, TransactionLogs parent) - { - this.desc = referent.descriptor; - this.sizeOnDisk = referent.bytesOnDisk(); - this.tracker = parent.tracker; - this.wasNew = wasNew; - this.parentRef = parent.selfRef.tryRef(); - } - - public void run() - { - blocker.ask(); - - SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); - - try - { - // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier - File datafile = new File(desc.filenameFor(Component.DATA)); - - delete(datafile); - // let the remainder be cleaned up by delete - SSTable.delete(desc, SSTable.discoverComponentsFor(desc)); - } - catch (Throwable t) - { - logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc); - failedDeletions.add(this); - return; - } - - if (tracker != null && tracker.cfstore != null && !wasNew) - tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk); - - // release the referent to the parent so that the all transaction files can be released - parentRef.release(); - } - - public void abort() - { - parentRef.release(); - } - } - - /** - * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.) - * Useful because there are times when we know GC has been invoked; also exposed as an mbean. - */ - public static void rescheduleFailedDeletions() - { - Runnable task; - while ( null != (task = failedDeletions.poll())) - ScheduledExecutors.nonPeriodicTasks.submit(task); - } - - /** - * Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader) - * so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed. - */ - public static void waitForDeletions() - { - FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(() -> { - }, 0, TimeUnit.MILLISECONDS)); - } - - @VisibleForTesting - public static void pauseDeletions(boolean stop) - { - blocker.block(stop); - } - - private Throwable complete(Throwable accumulate) - { - try - { - try - { - if (data.succeeded) - data.newLog().delete(false); - else - data.oldLog().delete(false); - } - catch (Throwable t) - { - accumulate = merge(accumulate, t); - } - - accumulate = selfRef.ensureReleased(accumulate); - return accumulate; - } - catch (Throwable t) - { - logger.error("Failed to complete file transaction {}", getId(), t); - return Throwables.merge(accumulate, t); - } - } - - protected Throwable doCommit(Throwable accumulate) - { - data.succeeded(true); - return complete(accumulate); - } - - protected Throwable doAbort(Throwable accumulate) - { - data.succeeded(false); - return complete(accumulate); - } - - protected void doPrepare() { } - - /** - * Called on startup to scan existing folders for any unfinished leftovers of - * operations that were ongoing when the process exited. - * - * We check if the new transaction file exists first, and if so we clean it up - * along with its contents, which includes the old file, else if only the old file exists - * it means the operation has completed and we only cleanup the old file with its contents. - */ - static void removeUnfinishedLeftovers(CFMetaData metadata) - { - Throwable accumulate = null; - Set<UUID> ids = new HashSet<>(); - - for (File dir : getFolders(metadata, null)) - { - File[] logs = dir.listFiles((dir1, name) -> { - return TransactionData.isLogFile(name); - }); - - for (File log : logs) - { - try (TransactionData data = TransactionData.make(log)) - { - // we need to check this because there are potentially 2 log files per operation - if (ids.contains(data.id)) - continue; - - ids.add(data.id); - accumulate = data.removeUnfinishedLeftovers(accumulate); - } - } - } - - if (accumulate != null) - logger.error("Failed to remove unfinished transaction leftovers", accumulate); - } - - /** - * Return a set of files that are temporary, that is they are involved with - * a transaction that hasn't completed yet. - * - * Only return the files that exist and that are located in the folder - * specified as a parameter or its sub-folders. - */ - static Set<File> getTemporaryFiles(CFMetaData metadata, File folder) - { - Set<File> ret = new HashSet<>(); - Set<UUID> ids = new HashSet<>(); - - for (File dir : getFolders(metadata, folder)) - { - File[] logs = dir.listFiles((dir1, name) -> { - return TransactionData.isLogFile(name); - }); - - for (File log : logs) - { - try(TransactionData data = TransactionData.make(log)) - { - // we need to check this because there are potentially 2 log files per transaction - if (ids.contains(data.id)) - continue; - - ids.add(data.id); - ret.addAll(data.getTemporaryFiles() - .stream() - .filter(file -> FileUtils.isContained(folder, file)) - .collect(Collectors.toSet())); - } - } - } - - return ret; - } - - /** - * Return the transaction log files that currently exist for this table. - */ - static Set<File> getLogFiles(CFMetaData metadata) - { - Set<File> ret = new HashSet<>(); - for (File dir : getFolders(metadata, null)) - ret.addAll(Arrays.asList(dir.listFiles((dir1, name) -> { - return TransactionData.isLogFile(name); - }))); - - return ret; - } - - /** - * A utility method to work out the existing transaction sub-folders - * either for a table, or a specific parent folder, or both. - */ - private static List<File> getFolders(CFMetaData metadata, File folder) - { - List<File> ret = new ArrayList<>(); - if (metadata != null) - { - Directories directories = new Directories(metadata); - ret.addAll(directories.getExistingDirectories(Directories.TRANSACTIONS_SUBDIR)); - } - - if (folder != null) - { - File opDir = Directories.getExistingDirectory(folder, Directories.TRANSACTIONS_SUBDIR); - if (opDir != null) - ret.add(opDir); - } - - return ret; - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 20c3962..3286522 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -18,7 +18,6 @@ package org.apache.cassandra.io.sstable; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.net.InetAddress; import java.util.*; @@ -27,16 +26,13 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.*; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.Pair; @@ -77,89 +73,79 @@ public class SSTableLoader implements StreamEventHandler { outputHandler.output("Opening sstables and calculating sections to stream"); - directory.list(new FilenameFilter() - { - final Map<File, Set<File>> allTemporaryFiles = new HashMap<>(); - public boolean accept(File dir, String name) - { - File file = new File(dir, name); - - if (file.isDirectory()) - return false; - - Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name); - Descriptor desc = p == null ? null : p.left; - if (p == null || !p.right.equals(Component.DATA)) - return false; - - if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists()) - { - outputHandler.output(String.format("Skipping file %s because index is missing", name)); - return false; - } - - CFMetaData metadata = client.getTableMetadata(desc.cfname); - if (metadata == null) - { - outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, keyspace, desc.cfname)); - return false; - } - - Set<File> temporaryFiles = allTemporaryFiles.get(dir); - if (temporaryFiles == null) - { - temporaryFiles = LifecycleTransaction.getTemporaryFiles(metadata, dir); - allTemporaryFiles.put(dir, temporaryFiles); - } - - if (temporaryFiles.contains(file)) - { - outputHandler.output(String.format("Skipping temporary file %s", name)); - return false; - } - - Set<Component> components = new HashSet<>(); - components.add(Component.DATA); - components.add(Component.PRIMARY_INDEX); - if (new File(desc.filenameFor(Component.SUMMARY)).exists()) - components.add(Component.SUMMARY); - if (new File(desc.filenameFor(Component.COMPRESSION_INFO)).exists()) - components.add(Component.COMPRESSION_INFO); - if (new File(desc.filenameFor(Component.STATS)).exists()) - components.add(Component.STATS); - - try - { - // To conserve memory, open SSTableReaders without bloom filters and discard - // the index summary after calculating the file sections to stream and the estimated - // number of keys for each endpoint. See CASSANDRA-5555 for details. - SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata); - sstables.add(sstable); - - // calculate the sstable sections to stream as well as the estimated number of - // keys per host - for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : ranges.entrySet()) - { - InetAddress endpoint = entry.getKey(); - Collection<Range<Token>> tokenRanges = entry.getValue(); - - List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges); - long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges); - Ref<SSTableReader> ref = sstable.ref(); - StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE); - streamingDetails.put(endpoint, details); - } - - // to conserve heap space when bulk loading - sstable.releaseSummary(); - } - catch (IOException e) - { - outputHandler.output(String.format("Skipping file %s, error opening it: %s", name, e.getMessage())); - } - return false; - } - }); + LifecycleTransaction.getFiles(directory.toPath(), + (file, type) -> + { + File dir = file.getParentFile(); + String name = file.getName(); + + if (type != Directories.FileType.FINAL) + { + outputHandler.output(String.format("Skipping temporary file %s", name)); + return false; + } + + Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name); + Descriptor desc = p == null ? null : p.left; + if (p == null || !p.right.equals(Component.DATA)) + return false; + + if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists()) + { + outputHandler.output(String.format("Skipping file %s because index is missing", name)); + return false; + } + + CFMetaData metadata = client.getTableMetadata(desc.cfname); + if (metadata == null) + { + outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, keyspace, desc.cfname)); + return false; + } + + Set<Component> components = new HashSet<>(); + components.add(Component.DATA); + components.add(Component.PRIMARY_INDEX); + if (new File(desc.filenameFor(Component.SUMMARY)).exists()) + components.add(Component.SUMMARY); + if (new File(desc.filenameFor(Component.COMPRESSION_INFO)).exists()) + components.add(Component.COMPRESSION_INFO); + if (new File(desc.filenameFor(Component.STATS)).exists()) + components.add(Component.STATS); + + try + { + // To conserve memory, open SSTableReaders without bloom filters and discard + // the index summary after calculating the file sections to stream and the estimated + // number of keys for each endpoint. See CASSANDRA-5555 for details. + SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata); + sstables.add(sstable); + + // calculate the sstable sections to stream as well as the estimated number of + // keys per host + for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : ranges.entrySet()) + { + InetAddress endpoint = entry.getKey(); + Collection<Range<Token>> tokenRanges = entry.getValue(); + + List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges); + long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges); + Ref<SSTableReader> ref = sstable.ref(); + StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE); + streamingDetails.put(endpoint, details); + } + + // to conserve heap space when bulk loading + sstable.releaseSummary(); + } + catch (IOException e) + { + outputHandler.output(String.format("Skipping file %s, error opening it: %s", name, e.getMessage())); + } + return false; + }, + Directories.OnTxnErr.IGNORE); + return sstables; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index d5c192e..5502669 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -45,7 +45,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.index.SecondaryIndex; -import org.apache.cassandra.db.lifecycle.TransactionLogs; +import org.apache.cassandra.db.lifecycle.TransactionLog; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.io.FSError; @@ -1647,7 +1647,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * @return true if the this is the first time the file was marked obsolete. Calling this * multiple times is usually buggy (see exceptions in Tracker.unmarkCompacting and removeOldSSTablesSize). */ - public void markObsolete(TransactionLogs.SSTableTidier tidier) + public void markObsolete(TransactionLog.SSTableTidier tidier) { if (logger.isDebugEnabled()) logger.debug("Marking {} compacted", getFilename()); @@ -1903,22 +1903,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS : (sstableMetadata.totalRows == 0 ? 0 : (int)(sstableMetadata.totalColumnsSet / sstableMetadata.totalRows)); } - public Set<Integer> getAncestors() - { - try - { - CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION); - if (compactionMetadata != null) - return compactionMetadata.ancestors; - return Collections.emptySet(); - } - catch (IOException e) - { - SSTableReader.logOpenException(descriptor, e); - return Collections.emptySet(); - } - } - public int getSSTableLevel() { return sstableMetadata.sstableLevel; @@ -2191,7 +2175,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // sstable have been released private ScheduledFuture readMeterSyncFuture; // shared state managing if the logical sstable has been compacted; this is used in cleanup - private volatile TransactionLogs.SSTableTidier obsoletion; + private volatile TransactionLog.SSTableTidier obsoletion; GlobalTidy(final SSTableReader reader) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/io/sstable/format/Version.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java index 9ef0b43..4fcf055 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/Version.java +++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java @@ -66,6 +66,8 @@ public abstract class Version public abstract boolean hasOldBfHashOrder(); + public abstract boolean hasCompactionAncestors(); + public String getVersion() { return version; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index 6df4b1e..d65710e 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -140,6 +140,11 @@ public class BigFormat implements SSTableFormat */ private final boolean hasOldBfHashOrder; + /** + * CASSANDRA-7066: compaction ancerstors are no longer used and have been removed. + */ + private final boolean hasCompactionAncestors; + BigVersion(String version) { super(instance, version); @@ -166,6 +171,7 @@ public class BigFormat implements SSTableFormat newFileName = version.compareTo("la") >= 0; hasOldBfHashOrder = version.compareTo("ma") < 0; + hasCompactionAncestors = version.compareTo("ma") < 0; storeRows = version.compareTo("ma") >= 0; correspondingMessagingVersion = storeRows ? MessagingService.VERSION_30 @@ -221,6 +227,12 @@ public class BigFormat implements SSTableFormat } @Override + public boolean hasCompactionAncestors() + { + return hasCompactionAncestors; + } + + @Override public boolean hasNewFileName() { return newFileName; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java index 29cbe5b..fa94ead 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java @@ -18,8 +18,6 @@ package org.apache.cassandra.io.sstable.metadata; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.ICardinality; @@ -39,13 +37,10 @@ public class CompactionMetadata extends MetadataComponent { public static final IMetadataComponentSerializer serializer = new CompactionMetadataSerializer(); - public final Set<Integer> ancestors; - public final ICardinality cardinalityEstimator; - public CompactionMetadata(Set<Integer> ancestors, ICardinality cardinalityEstimator) + public CompactionMetadata(ICardinality cardinalityEstimator) { - this.ancestors = ancestors; this.cardinalityEstimator = cardinalityEstimator; } @@ -57,48 +52,46 @@ public class CompactionMetadata extends MetadataComponent @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; - CompactionMetadata that = (CompactionMetadata) o; - return ancestors == null ? that.ancestors == null : ancestors.equals(that.ancestors); + // keeping equals and hashCode as all classes inheriting from MetadataComponent + // implement them but we have really nothing to compare + return true; } @Override public int hashCode() { - return ancestors != null ? ancestors.hashCode() : 0; + // see comment in equals + return 31; } public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata> { public int serializedSize(CompactionMetadata component) throws IOException { - int size = 0; - size += TypeSizes.sizeof(component.ancestors.size()); - for (int g : component.ancestors) - size += TypeSizes.sizeof(g); byte[] serializedCardinality = component.cardinalityEstimator.getBytes(); - size += TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length; - return size; + return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length; } public void serialize(CompactionMetadata component, DataOutputPlus out) throws IOException { - out.writeInt(component.ancestors.size()); - for (int g : component.ancestors) - out.writeInt(g); ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out); } public CompactionMetadata deserialize(Version version, DataInputPlus in) throws IOException { - int nbAncestors = in.readInt(); - Set<Integer> ancestors = new HashSet<>(nbAncestors); - for (int i = 0; i < nbAncestors; i++) - ancestors.add(in.readInt()); + if (version.hasCompactionAncestors()) + { // skip ancestors + int nbAncestors = in.readInt(); + in.skipBytes(nbAncestors * TypeSizes.sizeof(nbAncestors)); + } ICardinality cardinality = HyperLogLogPlus.Builder.build(ByteBufferUtil.readBytes(in, in.readInt())); - return new CompactionMetadata(ancestors, cardinality); + return new CompactionMetadata(cardinality); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java index 433c31a..53ba922 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java @@ -23,6 +23,7 @@ import java.util.*; import com.google.common.collect.Maps; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -60,9 +61,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer out.writeDouble(validation.bloomFilterFPChance); out.writeDouble(stats.compressionRatio); out.writeUTF(validation.partitioner); - out.writeInt(compaction.ancestors.size()); - for (Integer g : compaction.ancestors) - out.writeInt(g); + out.writeInt(0); // compaction ancestors StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out); out.writeInt(stats.sstableLevel); out.writeInt(stats.minClusteringValues.size()); @@ -99,10 +98,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer double bloomFilterFPChance = in.readDouble(); double compressionRatio = in.readDouble(); String partitioner = in.readUTF(); - int nbAncestors = in.readInt(); - Set<Integer> ancestors = new HashSet<>(nbAncestors); - for (int i = 0; i < nbAncestors; i++) - ancestors.add(in.readInt()); + int nbAncestors = in.readInt(); //skip compaction ancestors + in.skipBytes(nbAncestors * TypeSizes.sizeof(nbAncestors)); StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in); int sstableLevel = 0; if (in.available() > 0) @@ -143,7 +140,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer -1)); if (types.contains(MetadataType.COMPACTION)) components.put(MetadataType.COMPACTION, - new CompactionMetadata(ancestors, null)); + new CompactionMetadata(null)); } } return components; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 9b06b53..1c93f58 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -94,7 +94,6 @@ public class MetadataCollector implements PartitionStatisticsCollector protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME); protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL); protected double compressionRatio = NO_COMPRESSION_RATIO; - protected Set<Integer> ancestors = new HashSet<>(); protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram(); protected int sstableLevel; protected ByteBuffer[] minClusteringValues; @@ -120,29 +119,12 @@ public class MetadataCollector implements PartitionStatisticsCollector this.maxClusteringValues = new ByteBuffer[comparator.size()]; } - public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level, boolean skipAncestors) + public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level) { this(comparator); replayPosition(ReplayPosition.getReplayPosition(sstables)); sstableLevel(level); - // Get the max timestamp of the precompacted sstables - // and adds generation of live ancestors - if (!skipAncestors) - { - for (SSTableReader sstable : sstables) - { - addAncestor(sstable.descriptor.generation); - for (Integer i : sstable.getAncestors()) - if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists()) - addAncestor(i); - } - } - } - - public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level) - { - this(sstables, comparator, level, false); } public MetadataCollector addKey(ByteBuffer key) @@ -237,12 +219,6 @@ public class MetadataCollector implements PartitionStatisticsCollector return this; } - public MetadataCollector addAncestor(int generation) - { - this.ancestors.add(generation); - return this; - } - public MetadataCollector sstableLevel(int sstableLevel) { this.sstableLevel = sstableLevel; @@ -313,7 +289,7 @@ public class MetadataCollector implements PartitionStatisticsCollector repairedAt, totalColumnsSet, totalRows)); - components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality)); + components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality)); components.put(MetadataType.HEADER, header.toComponent()); return components; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index c3de1db..0ba2307 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -616,11 +616,6 @@ public class FileUtils { return Files.readAllLines(file.toPath(), Charset.forName("utf-8")); } - catch (NoSuchFileException ex) - { - logger.warn("Tried to read non existing file: {}", file); - return Collections.emptyList(); - } catch (IOException ex) { throw new RuntimeException(ex); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/service/GCInspector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java index fc7ff3d..252d1c3 100644 --- a/src/java/org/apache/cassandra/service/GCInspector.java +++ b/src/java/org/apache/cassandra/service/GCInspector.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; import com.sun.management.GarbageCollectionNotificationInfo; import com.sun.management.GcInfo; -import org.apache.cassandra.db.lifecycle.TransactionLogs; +import org.apache.cassandra.db.lifecycle.TransactionLog; import org.apache.cassandra.utils.StatusLogger; public class GCInspector implements NotificationListener, GCInspectorMXBean @@ -284,7 +284,7 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean // if we just finished an old gen collection and we're still using a lot of memory, try to reduce the pressure if (gcState.assumeGCIsOldGen) - TransactionLogs.rescheduleFailedDeletions(); + TransactionLog.rescheduleFailedDeletions(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/service/StartupChecks.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index fee9789..9ffef96 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -237,8 +237,7 @@ public class StartupChecks { String name = dir.getFileName().toString(); return (name.equals(Directories.SNAPSHOT_SUBDIR) - || name.equals(Directories.BACKUPS_SUBDIR) - || name.equals(Directories.TRANSACTIONS_SUBDIR)) + || name.equals(Directories.BACKUPS_SUBDIR)) ? FileVisitResult.SKIP_SUBTREE : FileVisitResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 959f7e3..5966e49 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -70,7 +70,7 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.lifecycle.TransactionLogs; +import org.apache.cassandra.db.lifecycle.TransactionLog; import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -4241,7 +4241,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void rescheduleFailedDeletions() { - TransactionLogs.rescheduleFailedDeletions(); + TransactionLog.rescheduleFailedDeletions(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java index bc17750..c8587d8 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java +++ b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java @@ -34,6 +34,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -68,7 +69,7 @@ public class SSTableExpiredBlockers Keyspace ks = Keyspace.openWithoutSSTables(keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily); - Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true); + Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); Set<SSTableReader> sstables = new HashSet<>(); for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java index d73a325..cb3cc5c 100644 --- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java +++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -75,7 +76,7 @@ public class SSTableLevelResetter Keyspace keyspace = Keyspace.openWithoutSSTables(keyspaceName); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnfamily); boolean foundSSTable = false; - for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.directories.sstableLister().list().entrySet()) + for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().entrySet()) { if (sstable.getValue().contains(Component.STATS)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java index a644f49..7764bbf 100644 --- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java +++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java @@ -78,9 +78,7 @@ public class SSTableMetadataViewer } if (compaction != null) { - out.printf("Ancestors: %s%n", compaction.ancestors.toString()); out.printf("Estimated cardinality: %s%n", compaction.cardinalityEstimator.cardinality()); - } } else http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java index f7e477f..95f516a 100644 --- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java +++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java @@ -95,7 +95,7 @@ public class SSTableOfflineRelevel Keyspace ks = Keyspace.openWithoutSSTables(keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily); - Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true); + Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); Set<SSTableReader> sstables = new HashSet<>(); for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/StandaloneLister.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneLister.java b/src/java/org/apache/cassandra/tools/StandaloneLister.java deleted file mode 100644 index 71575c7..0000000 --- a/src/java/org/apache/cassandra/tools/StandaloneLister.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cassandra.tools; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.utils.OutputHandler; -import org.apache.commons.cli.*; - -import java.io.File; - -import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions; - -public class StandaloneLister -{ - private static final String TOOL_NAME = "sstablelister"; - private static final String TYPE_OPTION = "type"; - private static final String OP_LOG_OPTION = "oplog"; - private static final String VERBOSE_OPTION = "verbose"; - private static final String DEBUG_OPTION = "debug"; - private static final String HELP_OPTION = "help"; - - public static void main(String args[]) - { - Options options = Options.parseArgs(args); - try - { - // load keyspace descriptions. - Schema.instance.loadFromDisk(false); - - CFMetaData metadata = Schema.instance.getCFMetaData(options.keyspaceName, options.cfName); - if (metadata == null) - throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s", - options.keyspaceName, - options.cfName)); - - OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); - - Directories directories = new Directories(metadata); - Directories.SSTableLister lister = directories.sstableLister(); - - if (options.type == Options.FileType.FINAL) - lister.skipTemporary(true); - else if (options.type == Options.FileType.TMP) - lister.onlyTemporary(true); - - for (File file : lister.listFiles()) - handler.output(file.getCanonicalPath()); - - if (options.oplogs) - { - for (File file : LifecycleTransaction.getLogFiles(metadata)) - { - handler.output(file.getCanonicalPath()); - } - } - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - if (options.debug) - e.printStackTrace(System.err); - System.exit(1); - } - } - - private static class Options - { - public enum FileType - { - ALL("all", "list all files, final or temporary"), - TMP("tmp", "list temporary files only"), - FINAL("final", "list final files only"); - - public String option; - public String descr; - FileType(String option, String descr) - { - this.option = option; - this.descr = descr; - } - - static FileType fromOption(String option) - { - for (FileType fileType : FileType.values()) - { - if (fileType.option.equals(option)) - return fileType; - } - - return FileType.ALL; - } - - static String descr() - { - StringBuilder str = new StringBuilder(); - for (FileType fileType : FileType.values()) - { - str.append(fileType.option); - str.append(" ("); - str.append(fileType.descr); - str.append("), "); - } - return str.toString(); - } - } - - public final String keyspaceName; - public final String cfName; - - public boolean debug; - public boolean verbose; - public boolean oplogs; - public FileType type; - - private Options(String keyspaceName, String cfName) - { - this.keyspaceName = keyspaceName; - this.cfName = cfName; - } - - public static Options parseArgs(String cmdArgs[]) - { - CommandLineParser parser = new GnuParser(); - CmdLineOptions options = getCmdLineOptions(); - try - { - CommandLine cmd = parser.parse(options, cmdArgs, false); - - if (cmd.hasOption(HELP_OPTION)) - { - printUsage(options); - System.exit(0); - } - - String[] args = cmd.getArgs(); - if (args.length != 2) - { - String msg = args.length < 2 ? "Missing arguments" : "Too many arguments"; - System.err.println(msg); - printUsage(options); - System.exit(1); - } - - String keyspaceName = args[0]; - String cfName = args[1]; - - Options opts = new Options(keyspaceName, cfName); - - opts.debug = cmd.hasOption(DEBUG_OPTION); - opts.verbose = cmd.hasOption(VERBOSE_OPTION); - opts.type = FileType.fromOption(cmd.getOptionValue(TYPE_OPTION)); - opts.oplogs = cmd.hasOption(OP_LOG_OPTION); - - return opts; - } - catch (ParseException e) - { - errorMsg(e.getMessage(), options); - return null; - } - } - - private static void errorMsg(String msg, CmdLineOptions options) - { - System.err.println(msg); - printUsage(options); - System.exit(1); - } - - private static CmdLineOptions getCmdLineOptions() - { - CmdLineOptions options = new CmdLineOptions(); - options.addOption("d", DEBUG_OPTION, "display stack traces"); - options.addOption("h", HELP_OPTION, "display this help message"); - options.addOption("o", OP_LOG_OPTION, "include operation logs"); - options.addOption("t", TYPE_OPTION, true, FileType.descr()); - options.addOption("v", VERBOSE_OPTION, "verbose output"); - - return options; - } - - public static void printUsage(CmdLineOptions options) - { - String usage = String.format("%s [options] <keyspace> <column_family>", TOOL_NAME); - StringBuilder header = new StringBuilder(); - header.append("--\n"); - header.append("List sstable files for the provided table." ); - header.append("\n--\n"); - header.append("Options are:"); - new HelpFormatter().printHelp(usage, header.toString(), options, ""); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java b/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java new file mode 100644 index 0000000..6e2be1d --- /dev/null +++ b/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.tools; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.utils.OutputHandler; +import org.apache.commons.cli.*; + +import java.io.File; +import java.io.IOException; +import java.util.function.BiFunction; + +import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions; + +public class StandaloneSSTableUtil +{ + private static final String TOOL_NAME = "sstableutil"; + private static final String TYPE_OPTION = "type"; + private static final String OP_LOG_OPTION = "oplog"; + private static final String VERBOSE_OPTION = "verbose"; + private static final String DEBUG_OPTION = "debug"; + private static final String HELP_OPTION = "help"; + private static final String CLEANUP_OPTION = "cleanup"; + + public static void main(String args[]) + { + Options options = Options.parseArgs(args); + try + { + // load keyspace descriptions. + Schema.instance.loadFromDisk(false); + + CFMetaData metadata = Schema.instance.getCFMetaData(options.keyspaceName, options.cfName); + if (metadata == null) + throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s", + options.keyspaceName, + options.cfName)); + + OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); + + if (options.cleanup) + { + handler.output("Cleanuping up..."); + LifecycleTransaction.removeUnfinishedLeftovers(metadata); + } + else + { + handler.output("Listing files..."); + listFiles(options, metadata, handler); + } + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + if (options.debug) + e.printStackTrace(System.err); + System.exit(1); + } + } + + private static void listFiles(Options options, CFMetaData metadata, OutputHandler handler) throws IOException + { + Directories directories = new Directories(metadata); + + for (File dir : directories.getCFDirectories()) + { + for (File file : LifecycleTransaction.getFiles(dir.toPath(), getFilter(options), Directories.OnTxnErr.THROW)) + handler.output(file.getCanonicalPath()); + } + } + + private static BiFunction<File, Directories.FileType, Boolean> getFilter(Options options) + { + return (file, type) -> + { + switch(type) + { + case FINAL: + return options.type != Options.FileType.TMP; + case TEMPORARY: + return options.type != Options.FileType.FINAL; + case TXN_LOG: + return options.oplogs; + default: + throw new AssertionError(); + } + }; + } + + private static class Options + { + public enum FileType + { + ALL("all", "list all files, final or temporary"), + TMP("tmp", "list temporary files only"), + FINAL("final", "list final files only"); + + public String option; + public String descr; + FileType(String option, String descr) + { + this.option = option; + this.descr = descr; + } + + static FileType fromOption(String option) + { + for (FileType fileType : FileType.values()) + { + if (fileType.option.equals(option)) + return fileType; + } + + return FileType.ALL; + } + + static String descr() + { + StringBuilder str = new StringBuilder(); + for (FileType fileType : FileType.values()) + { + str.append(fileType.option); + str.append(" ("); + str.append(fileType.descr); + str.append("), "); + } + return str.toString(); + } + } + + public final String keyspaceName; + public final String cfName; + + public boolean debug; + public boolean verbose; + public boolean oplogs; + public boolean cleanup; + public FileType type; + + private Options(String keyspaceName, String cfName) + { + this.keyspaceName = keyspaceName; + this.cfName = cfName; + } + + public static Options parseArgs(String cmdArgs[]) + { + CommandLineParser parser = new GnuParser(); + CmdLineOptions options = getCmdLineOptions(); + try + { + CommandLine cmd = parser.parse(options, cmdArgs, false); + + if (cmd.hasOption(HELP_OPTION)) + { + printUsage(options); + System.exit(0); + } + + String[] args = cmd.getArgs(); + if (args.length != 2) + { + String msg = args.length < 2 ? "Missing arguments" : "Too many arguments"; + System.err.println(msg); + printUsage(options); + System.exit(1); + } + + String keyspaceName = args[0]; + String cfName = args[1]; + + Options opts = new Options(keyspaceName, cfName); + + opts.debug = cmd.hasOption(DEBUG_OPTION); + opts.verbose = cmd.hasOption(VERBOSE_OPTION); + opts.type = FileType.fromOption(cmd.getOptionValue(TYPE_OPTION)); + opts.oplogs = cmd.hasOption(OP_LOG_OPTION); + opts.cleanup = cmd.hasOption(CLEANUP_OPTION); + + return opts; + } + catch (ParseException e) + { + errorMsg(e.getMessage(), options); + return null; + } + } + + private static void errorMsg(String msg, CmdLineOptions options) + { + System.err.println(msg); + printUsage(options); + System.exit(1); + } + + private static CmdLineOptions getCmdLineOptions() + { + CmdLineOptions options = new CmdLineOptions(); + options.addOption("c", CLEANUP_OPTION, "clean-up any outstanding transactions"); + options.addOption("d", DEBUG_OPTION, "display stack traces"); + options.addOption("h", HELP_OPTION, "display this help message"); + options.addOption("o", OP_LOG_OPTION, "include operation logs"); + options.addOption("t", TYPE_OPTION, true, FileType.descr()); + options.addOption("v", VERBOSE_OPTION, "verbose output"); + + return options; + } + + public static void printUsage(CmdLineOptions options) + { + String usage = String.format("%s [options] <keyspace> <column_family>", TOOL_NAME); + StringBuilder header = new StringBuilder(); + header.append("--\n"); + header.append("List sstable files for the provided table." ); + header.append("\n--\n"); + header.append("Options are:"); + new HelpFormatter().printHelp(usage, header.toString(), options, ""); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index 9388d98..f64b8d9 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -34,7 +34,7 @@ import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.db.lifecycle.TransactionLogs; +import org.apache.cassandra.db.lifecycle.TransactionLog; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -84,7 +84,7 @@ public class StandaloneScrubber String snapshotName = "pre-scrub-" + System.currentTimeMillis(); OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); - Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true); + Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); List<SSTableReader> sstables = new ArrayList<>(); @@ -145,7 +145,7 @@ public class StandaloneScrubber // Check (and repair) manifests checkManifest(cfs.getCompactionStrategyManager(), cfs, sstables); CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES); - TransactionLogs.waitForDeletions(); + TransactionLog.waitForDeletions(); System.exit(0); // We need that to stop non daemonized threads } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java index e53038d..a192491 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java +++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java @@ -23,7 +23,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.lifecycle.TransactionLogs; +import org.apache.cassandra.db.lifecycle.TransactionLog; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; @@ -164,7 +164,7 @@ public class StandaloneSplitter } } CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES); - TransactionLogs.waitForDeletions(); + TransactionLog.waitForDeletions(); System.exit(0); // We need that to stop non daemonized threads } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java index 0851d5b..88e34b7 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java +++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java @@ -20,7 +20,7 @@ package org.apache.cassandra.tools; import java.util.*; import java.util.concurrent.TimeUnit; -import org.apache.cassandra.db.lifecycle.TransactionLogs; +import org.apache.cassandra.db.lifecycle.TransactionLog; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; @@ -63,7 +63,7 @@ public class StandaloneUpgrader ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cf); OutputHandler handler = new OutputHandler.SystemOutput(false, options.debug); - Directories.SSTableLister lister = cfs.directories.sstableLister(); + Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW); if (options.snapshot != null) lister.onlyBackups(true).snapshots(options.snapshot); else @@ -120,7 +120,7 @@ public class StandaloneUpgrader } } CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES); - TransactionLogs.waitForDeletions(); + TransactionLog.waitForDeletions(); System.exit(0); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java index f71f58d..0b17e39 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java +++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java @@ -69,7 +69,7 @@ public class StandaloneVerifier ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cfName); OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); - Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true); + Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); boolean extended = options.extended; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/tools/nodetool/Stop.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Stop.java b/src/java/org/apache/cassandra/tools/nodetool/Stop.java index ad1fc27..6229e65 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Stop.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Stop.java @@ -36,7 +36,7 @@ public class Stop extends NodeToolCmd @Option(title = "compactionId", name = {"-id", "--compaction-id"}, - description = "Use -id to stop a compaction by the specified id. Ids can be found in the system.compactions_in_progress table.", + description = "Use -id to stop a compaction by the specified id. Ids can be found in the transaction log files whose name starts with compaction_, located in the table transactions folder.", required = false) private String compactionId = ""; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 47c07ac..9e36bab 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -532,7 +532,7 @@ public class ColumnFamilyStoreTest ColumnFamilyStore.scrubDataDirectories(cfs.metadata); - List<File> ssTableFiles = new Directories(cfs.metadata).sstableLister().listFiles(); + List<File> ssTableFiles = new Directories(cfs.metadata).sstableLister(Directories.OnTxnErr.THROW).listFiles(); assertNotNull(ssTableFiles); assertEquals(0, ssTableFiles.size()); }
