Repository: cassandra Updated Branches: refs/heads/trunk 307890363 -> e16d8a7a6
Enhanced Compaction Logging patch by Carl Yeksigian; reviewed by Marcus Eriksson for CASSANDRA-10805 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e16d8a7a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e16d8a7a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e16d8a7a Branch: refs/heads/trunk Commit: e16d8a7a667d50271a183a95be894126cb2a5414 Parents: 3078903 Author: Carl Yeksigian <c...@apache.org> Authored: Mon May 2 15:01:39 2016 -0400 Committer: Carl Yeksigian <c...@apache.org> Committed: Mon May 2 15:03:38 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 13 +- .../compaction/AbstractCompactionStrategy.java | 23 +- .../db/compaction/CompactionLogger.java | 342 +++++++++++++++++++ .../compaction/CompactionStrategyManager.java | 39 ++- .../cassandra/db/compaction/CompactionTask.java | 2 + .../DateTieredCompactionStrategy.java | 31 ++ .../DateTieredCompactionStrategyOptions.java | 8 +- .../compaction/LeveledCompactionStrategy.java | 27 +- .../SizeTieredCompactionStrategy.java | 1 + 10 files changed, 475 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c802031..1a3069c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Enhanced Compaction Logging (CASSANDRA-10805) * Make prepared statement cache size configurable (CASSANDRA-11555) * Integrated JMX authentication and authorization (CASSANDRA-10091) * Add units to stress ouput (CASSANDRA-11352) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index b47cf85..6b841c2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1163,12 +1163,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } memtable.cfs.replaceFlushed(memtable, sstables); reclaim(memtable); - logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}", - sstables, - sstables.size(), - FBUtilities.prettyPrintMemory(totalBytesOnDisk), - FBUtilities.prettyPrintMemory(maxBytesOnDisk), - FBUtilities.prettyPrintMemory(minBytesOnDisk)); + memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables); + logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}", + sstables, + sstables.size(), + FBUtilities.prettyPrintMemory(totalBytesOnDisk), + FBUtilities.prettyPrintMemory(maxBytesOnDisk), + FBUtilities.prettyPrintMemory(minBytesOnDisk)); } private void reclaim(final Memtable memtable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 40f0ce2..668bc51 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -63,11 +63,13 @@ public abstract class AbstractCompactionStrategy // minimum interval needed to perform tombstone removal compaction in seconds, default 86400 or 1 day. protected static final long DEFAULT_TOMBSTONE_COMPACTION_INTERVAL = 86400; protected static final boolean DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION = false; + protected static final boolean DEFAULT_LOG_ALL_OPTION = false; protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold"; protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval"; // disable range overlap check when deciding if an SSTable is candidate for tombstone compaction (CASSANDRA-6563) protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction"; + protected static final String LOG_ALL_OPTION = "log_all"; protected static final String COMPACTION_ENABLED = "enabled"; public static final String ONLY_PURGE_REPAIRED_TOMBSTONES = "only_purge_repaired_tombstones"; @@ -78,6 +80,7 @@ public abstract class AbstractCompactionStrategy protected long tombstoneCompactionInterval; protected boolean uncheckedTombstoneCompaction; protected boolean disableTombstoneCompactions = false; + protected boolean logAll = true; private final Directories directories; @@ -110,6 +113,8 @@ public abstract class AbstractCompactionStrategy tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue); optionValue = options.get(UNCHECKED_TOMBSTONE_COMPACTION_OPTION); uncheckedTombstoneCompaction = optionValue == null ? DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION : Boolean.parseBoolean(optionValue); + optionValue = options.get(LOG_ALL_OPTION); + logAll = optionValue == null ? DEFAULT_LOG_ALL_OPTION : Boolean.parseBoolean(optionValue); if (!shouldBeEnabled()) this.disable(); } @@ -463,7 +468,16 @@ public abstract class AbstractCompactionStrategy if (unchecked != null) { if (!unchecked.equalsIgnoreCase("true") && !unchecked.equalsIgnoreCase("false")) - throw new ConfigurationException(String.format("'%s' should be either 'true' or 'false', not '%s'",UNCHECKED_TOMBSTONE_COMPACTION_OPTION, unchecked)); + throw new ConfigurationException(String.format("'%s' should be either 'true' or 'false', not '%s'", UNCHECKED_TOMBSTONE_COMPACTION_OPTION, unchecked)); + } + + String logAll = options.get(LOG_ALL_OPTION); + if (logAll != null) + { + if (!logAll.equalsIgnoreCase("true") && !logAll.equalsIgnoreCase("false")) + { + throw new ConfigurationException(String.format("'%s' should either be 'true' or 'false', not %s", LOG_ALL_OPTION, logAll)); + } } String compactionEnabled = options.get(COMPACTION_ENABLED); @@ -474,10 +488,12 @@ public abstract class AbstractCompactionStrategy throw new ConfigurationException(String.format("enabled should either be 'true' or 'false', not %s", compactionEnabled)); } } + Map<String, String> uncheckedOptions = new HashMap<String, String>(options); uncheckedOptions.remove(TOMBSTONE_THRESHOLD_OPTION); uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION); uncheckedOptions.remove(UNCHECKED_TOMBSTONE_COMPACTION_OPTION); + uncheckedOptions.remove(LOG_ALL_OPTION); uncheckedOptions.remove(COMPACTION_ENABLED); uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES); return uncheckedOptions; @@ -521,6 +537,11 @@ public abstract class AbstractCompactionStrategy return groupedSSTables; } + public CompactionLogger.Strategy strategyLogger() + { + return CompactionLogger.Strategy.none; + } + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java b/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java new file mode 100644 index 0000000..16a7f2a --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.file.*; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.NoSpamLogger; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; + +public class CompactionLogger +{ + public interface Strategy + { + JsonNode sstable(SSTableReader sstable); + + JsonNode options(); + + static Strategy none = new Strategy() + { + public JsonNode sstable(SSTableReader sstable) + { + return null; + } + + public JsonNode options() + { + return null; + } + }; + } + + /** + * This will produce the compaction strategy's starting information. + */ + public interface StrategySummary + { + JsonNode getSummary(); + } + + /** + * This is an interface to allow writing to a different interface. + */ + public interface Writer + { + /** + * This is used when we are already trying to write out the start of a + * @param statement This should be written out to the medium capturing the logs + * @param tag This is an identifier for a strategy; each strategy should have a distinct Object + */ + void writeStart(JsonNode statement, Object tag); + + /** + * @param statement This should be written out to the medium capturing the logs + * @param summary This can be used when a tag is not recognized by this writer; this can be because the file + * has been rolled, or otherwise the writer had to start over + * @param tag This is an identifier for a strategy; each strategy should have a distinct Object + */ + void write(JsonNode statement, StrategySummary summary, Object tag); + } + + private interface CompactionStrategyAndTableFunction + { + JsonNode apply(AbstractCompactionStrategy strategy, SSTableReader sstable); + } + + private static final JsonNodeFactory json = JsonNodeFactory.instance; + private static final Logger logger = LoggerFactory.getLogger(CompactionLogger.class); + private static final Writer serializer = new CompactionLogSerializer(); + private final ColumnFamilyStore cfs; + private final CompactionStrategyManager csm; + private final AtomicInteger identifier = new AtomicInteger(0); + private final Map<AbstractCompactionStrategy, String> compactionStrategyMapping = new ConcurrentHashMap<>(); + private final AtomicBoolean enabled = new AtomicBoolean(false); + + public CompactionLogger(ColumnFamilyStore cfs, CompactionStrategyManager csm) + { + this.csm = csm; + this.cfs = cfs; + } + + private void forEach(Consumer<AbstractCompactionStrategy> consumer) + { + csm.getStrategies() + .forEach(l -> l.forEach(consumer)); + } + + private ArrayNode compactionStrategyMap(Function<AbstractCompactionStrategy, JsonNode> select) + { + ArrayNode node = json.arrayNode(); + forEach(acs -> node.add(select.apply(acs))); + return node; + } + + private ArrayNode sstableMap(Collection<SSTableReader> sstables, CompactionStrategyAndTableFunction csatf) + { + ArrayNode node = json.arrayNode(); + sstables.forEach(t -> node.add(csatf.apply(csm.getCompactionStrategyFor(t), t))); + return node; + } + + private String getId(AbstractCompactionStrategy strategy) + { + return compactionStrategyMapping.computeIfAbsent(strategy, s -> String.valueOf(identifier.getAndIncrement())); + } + + private JsonNode formatSSTables(AbstractCompactionStrategy strategy) + { + ArrayNode node = json.arrayNode(); + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + if (csm.getCompactionStrategyFor(sstable) == strategy) + node.add(formatSSTable(strategy, sstable)); + } + return node; + } + + private JsonNode formatSSTable(AbstractCompactionStrategy strategy, SSTableReader sstable) + { + ObjectNode node = json.objectNode(); + node.put("generation", sstable.descriptor.generation); + node.put("version", sstable.descriptor.version.getVersion()); + node.put("size", sstable.onDiskLength()); + JsonNode logResult = strategy.strategyLogger().sstable(sstable); + if (logResult != null) + node.put("details", logResult); + return node; + } + + private JsonNode startStrategy(AbstractCompactionStrategy strategy) + { + ObjectNode node = json.objectNode(); + node.put("strategyId", getId(strategy)); + node.put("type", strategy.getName()); + node.put("tables", formatSSTables(strategy)); + node.put("repaired", csm.isRepaired(strategy)); + List<String> folders = csm.getStrategyFolders(strategy); + ArrayNode folderNode = json.arrayNode(); + for (String folder : folders) + { + folderNode.add(folder); + } + node.put("folders", folderNode); + + JsonNode logResult = strategy.strategyLogger().options(); + if (logResult != null) + node.put("options", logResult); + return node; + } + + private JsonNode shutdownStrategy(AbstractCompactionStrategy strategy) + { + ObjectNode node = json.objectNode(); + node.put("strategyId", getId(strategy)); + return node; + } + + private JsonNode describeSSTable(AbstractCompactionStrategy strategy, SSTableReader sstable) + { + ObjectNode node = json.objectNode(); + node.put("strategyId", getId(strategy)); + node.put("table", formatSSTable(strategy, sstable)); + return node; + } + + private void describeStrategy(ObjectNode node) + { + node.put("keyspace", cfs.keyspace.getName()); + node.put("table", cfs.getTableName()); + node.put("time", System.currentTimeMillis()); + } + + private JsonNode startStrategies() + { + ObjectNode node = json.objectNode(); + node.put("type", "enable"); + describeStrategy(node); + node.put("strategies", compactionStrategyMap(this::startStrategy)); + return node; + } + + public void enable() + { + if (enabled.compareAndSet(false, true)) + { + serializer.writeStart(startStrategies(), this); + } + } + + public void disable() + { + if (enabled.compareAndSet(true, false)) + { + ObjectNode node = json.objectNode(); + node.put("type", "disable"); + describeStrategy(node); + node.put("strategies", compactionStrategyMap(this::shutdownStrategy)); + serializer.write(node, this::startStrategies, this); + } + } + + public void flush(Collection<SSTableReader> sstables) + { + if (enabled.get()) + { + ObjectNode node = json.objectNode(); + node.put("type", "flush"); + describeStrategy(node); + node.put("tables", sstableMap(sstables, this::describeSSTable)); + serializer.write(node, this::startStrategies, this); + } + } + + public void compaction(long startTime, Collection<SSTableReader> input, long endTime, Collection<SSTableReader> output) + { + if (enabled.get()) + { + ObjectNode node = json.objectNode(); + node.put("type", "compaction"); + describeStrategy(node); + node.put("start", String.valueOf(startTime)); + node.put("end", String.valueOf(endTime)); + node.put("input", sstableMap(input, this::describeSSTable)); + node.put("output", sstableMap(output, this::describeSSTable)); + serializer.write(node, this::startStrategies, this); + } + } + + public void pending(AbstractCompactionStrategy strategy, int remaining) + { + if (remaining != 0 && enabled.get()) + { + ObjectNode node = json.objectNode(); + node.put("type", "pending"); + describeStrategy(node); + node.put("strategyId", getId(strategy)); + node.put("pending", remaining); + serializer.write(node, this::startStrategies, this); + } + } + + private static class CompactionLogSerializer implements Writer + { + private static final String logDirectory = System.getProperty("cassandra.logdir", "."); + private final ExecutorService loggerService = Executors.newFixedThreadPool(1); + // This is only accessed on the logger service thread, so it does not need to be thread safe + private final Set<Object> rolled = new HashSet<>(); + private OutputStreamWriter stream; + + private static OutputStreamWriter createStream() throws IOException + { + int count = 0; + Path compactionLog = Paths.get(logDirectory, "compaction.log"); + if (Files.exists(compactionLog)) + { + Path tryPath = compactionLog; + while (Files.exists(tryPath)) + { + tryPath = Paths.get(logDirectory, String.format("compaction-%d.log", count++)); + } + Files.move(compactionLog, tryPath); + } + + return new OutputStreamWriter(Files.newOutputStream(compactionLog, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)); + } + + private void writeLocal(String toWrite) + { + try + { + if (stream == null) + stream = createStream(); + stream.write(toWrite); + stream.flush(); + } + catch (IOException ioe) + { + // We'll drop the change and log the error to the logger. + NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, 1, TimeUnit.MINUTES, + "Could not write to the log file: {}", ioe); + } + + } + + public void writeStart(JsonNode statement, Object tag) + { + final String toWrite = statement.toString() + System.lineSeparator(); + loggerService.execute(() -> { + rolled.add(tag); + writeLocal(toWrite); + }); + } + + public void write(JsonNode statement, StrategySummary summary, Object tag) + { + final String toWrite = statement.toString() + System.lineSeparator(); + loggerService.execute(() -> { + if (!rolled.contains(tag)) + { + writeLocal(summary.getSummary().toString() + System.lineSeparator()); + rolled.add(tag); + } + writeLocal(toWrite); + }); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index be861e1..4d93294 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -22,6 +22,7 @@ import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; @@ -61,6 +62,7 @@ import org.apache.cassandra.service.StorageService; public class CompactionStrategyManager implements INotificationConsumer { private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class); + public final CompactionLogger compactionLogger; private final ColumnFamilyStore cfs; private final List<AbstractCompactionStrategy> repaired = new ArrayList<>(); private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>(); @@ -86,6 +88,7 @@ public class CompactionStrategyManager implements INotificationConsumer cfs.getTracker().subscribe(this); logger.trace("{} subscribed to the data tracker.", this); this.cfs = cfs; + this.compactionLogger = new CompactionLogger(cfs, this); reload(cfs.metadata); params = cfs.metadata.params.compaction; locations = getDirectories().getWriteableLocations(); @@ -162,6 +165,10 @@ public class CompactionStrategyManager implements INotificationConsumer { writeLock.unlock(); } + repaired.forEach(AbstractCompactionStrategy::startup); + unrepaired.forEach(AbstractCompactionStrategy::startup); + if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs -> cs.logAll)) + compactionLogger.enable(); } /** @@ -171,7 +178,7 @@ public class CompactionStrategyManager implements INotificationConsumer * @param sstable * @return */ - private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable) + public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable) { int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable); readLock.lock(); @@ -234,6 +241,7 @@ public class CompactionStrategyManager implements INotificationConsumer isActive = false; repaired.forEach(AbstractCompactionStrategy::shutdown); unrepaired.forEach(AbstractCompactionStrategy::shutdown); + compactionLogger.disable(); } finally { @@ -847,4 +855,33 @@ public class CompactionStrategyManager implements INotificationConsumer readLock.unlock(); } } + + public boolean isRepaired(AbstractCompactionStrategy strategy) + { + return repaired.contains(strategy); + } + + public List<String> getStrategyFolders(AbstractCompactionStrategy strategy) + { + Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations(); + if (cfs.getPartitioner().splitter().isPresent()) + { + int unrepairedIndex = unrepaired.indexOf(strategy); + if (unrepairedIndex > 0) + { + return Collections.singletonList(locations[unrepairedIndex].location.getAbsolutePath()); + } + int repairedIndex = repaired.indexOf(strategy); + if (repairedIndex > 0) + { + return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath()); + } + } + List<String> folders = new ArrayList<>(locations.length); + for (Directories.DataDirectory location : locations) + { + folders.add(location.location.getAbsolutePath()); + } + return folders; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 1465ba4..5df91fd 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -150,6 +150,7 @@ public class CompactionTask extends AbstractCompactionTask logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg); long start = System.nanoTime(); + long startTime = System.currentTimeMillis(); long totalKeysWritten = 0; long estimatedKeys = 0; try (CompactionController controller = getCompactionController(transaction.originals())) @@ -234,6 +235,7 @@ public class CompactionTask extends AbstractCompactionTask mergeSummary)); logger.trace(String.format("CF Total Bytes Compacted: %s", FBUtilities.prettyPrintMemory(CompactionTask.addToTotalBytesCompacted(endsize)))); logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); + cfs.getCompactionStrategyManager().compactionLogger.compaction(startTime, transaction.originals(), System.currentTimeMillis(), newSStables); // update the metrics cfs.metric.compactionBytesWritten.inc(endsize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 9a17e06..7c1ff13 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.compaction; import java.util.*; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; @@ -32,6 +33,9 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.utils.Pair; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; import static com.google.common.collect.Iterables.filter; @@ -342,6 +346,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy n += Math.ceil((double)stcsBucket.size() / cfs.getMaximumCompactionThreshold()); } estimatedRemainingTasks = n; + cfs.getCompactionStrategyManager().compactionLogger.pending(this, n); } @@ -453,6 +458,32 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy return uncheckedOptions; } + public CompactionLogger.Strategy strategyLogger() { + return new CompactionLogger.Strategy() + { + public JsonNode sstable(SSTableReader sstable) + { + ObjectNode node = JsonNodeFactory.instance.objectNode(); + node.put("min_timestamp", sstable.getMinTimestamp()); + node.put("max_timestamp", sstable.getMaxTimestamp()); + return node; + } + + public JsonNode options() + { + ObjectNode node = JsonNodeFactory.instance.objectNode(); + TimeUnit resolution = DateTieredCompactionStrategy.this.options.timestampResolution; + node.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, + resolution.toString()); + node.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, + resolution.toSeconds(DateTieredCompactionStrategy.this.options.baseTime)); + node.put(DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY, + resolution.toSeconds(DateTieredCompactionStrategy.this.options.maxWindowSize)); + return node; + } + }; + } + public String toString() { return String.format("DateTieredCompactionStrategy[%s/%s]", http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java index 78a0cab..fee9e34 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java @@ -44,6 +44,7 @@ public final class DateTieredCompactionStrategyOptions @Deprecated protected final long maxSSTableAge; + protected final TimeUnit timestampResolution; protected final long baseTime; protected final long expiredSSTableCheckFrequency; protected final long maxWindowSize; @@ -51,7 +52,7 @@ public final class DateTieredCompactionStrategyOptions public DateTieredCompactionStrategyOptions(Map<String, String> options) { String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY); - TimeUnit timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue); + timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue); if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION) logger.warn("Using a non-default timestamp_resolution {} - are you really doing inserts with USING TIMESTAMP <non_microsecond_timestamp> (or driver equivalent)?", timestampResolution.toString()); optionValue = options.get(MAX_SSTABLE_AGE_KEY); @@ -68,9 +69,10 @@ public final class DateTieredCompactionStrategyOptions public DateTieredCompactionStrategyOptions() { maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert((long) DEFAULT_MAX_SSTABLE_AGE_DAYS, TimeUnit.DAYS)); - baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS); + timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION; + baseTime = timestampResolution.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS); expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS); - maxWindowSize = DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS); + maxWindowSize = timestampResolution.convert(1, TimeUnit.DAYS); } public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 068d283..b6ad64c 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -38,6 +38,9 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.FBUtilities; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; public class LeveledCompactionStrategy extends AbstractCompactionStrategy { @@ -208,7 +211,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy public int getEstimatedRemainingTasks() { - return manifest.getEstimatedTasks(); + int n = manifest.getEstimatedTasks(); + cfs.getCompactionStrategyManager().compactionLogger.pending(this, n); + return n; } public long getMaxSSTableBytes() @@ -444,6 +449,26 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy return null; } + public CompactionLogger.Strategy strategyLogger() + { + return new CompactionLogger.Strategy() + { + public JsonNode sstable(SSTableReader sstable) + { + ObjectNode node = JsonNodeFactory.instance.objectNode(); + node.put("level", sstable.getSSTableLevel()); + node.put("min_token", sstable.first.getToken().toString()); + node.put("max_token", sstable.last.getToken().toString()); + return node; + } + + public JsonNode options() + { + return null; + } + }; + } + public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException { Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 28bdf5c..8ef2ac7 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -86,6 +86,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize); logger.trace("Compaction buckets are {}", buckets); estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets); + cfs.getCompactionStrategyManager().compactionLogger.pending(this, estimatedRemainingTasks); List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold); if (!mostInteresting.isEmpty()) return mostInteresting;