Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1667494 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1667494 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1667494 Branch: refs/heads/cassandra-3.0 Commit: f16674949592b518ba9da837b70665df10832e9b Parents: 7dd6b7d fc7075a Author: Marcus Eriksson <marc...@apache.org> Authored: Fri Dec 11 17:32:31 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Fri Dec 11 17:32:31 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionInfo.java | 14 +- .../db/compaction/CompactionManager.java | 14 + .../cassandra/db/compaction/OperationType.java | 3 +- .../io/sstable/IndexSummaryManager.java | 279 +-------------- .../io/sstable/IndexSummaryRedistribution.java | 349 +++++++++++++++++++ .../io/sstable/IndexSummaryManagerTest.java | 83 ++++- 7 files changed, 459 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 30a76a9,2ee8b07..5da0d42 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,31 -1,10 +1,32 @@@ -2.1.13 - * Allow cancellation of index summary redistribution (CASSANDRA-8805) +2.2.5 + * Fix regression in split size on CqlInputFormat (CASSANDRA-10835) + * Better handling of SSL connection errors inter-node (CASSANDRA-10816) * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) + * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761) +Merged from 2.1: ++ * Allow cancellation of index summary redistribution (CASSANDRA-8805) * Fix Stress profile parsing on Windows (CASSANDRA-10808) - -2.1.12 +2.2.4 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225) + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775) + * Reject index queries while the index is building (CASSANDRA-8505) + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747) + * Fix JSON update with prepared statements (CASSANDRA-10631) + * Don't do anticompaction after subrange repair (CASSANDRA-10422) + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) +Merged from 2.1: * Fix incremental repair hang when replica is down (CASSANDRA-10288) * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791) * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 65f93c0,9bddaf5..ba9c25e --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1345,7 -1222,21 +1345,21 @@@ public class CompactionManager implemen return executor.submit(runnable); } + public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException + { + metrics.beginCompaction(redistribution); + + try + { + return redistribution.redistributeSummaries(); + } + finally + { + metrics.finishCompaction(redistribution); + } + } + - static int getDefaultGcBefore(ColumnFamilyStore cfs) + public static int getDefaultGcBefore(ColumnFamilyStore cfs) { // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to // add any GcGrace however since 2ndary indexes are local to a node. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/OperationType.java index a14f13f,475b591..6b66ded --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@@ -32,7 -32,7 +32,8 @@@ public enum OperationTyp TOMBSTONE_COMPACTION("Tombstone Compaction"), UNKNOWN("Unknown compaction type"), ANTICOMPACTION("Anticompaction after repair"), - VERIFY("Verify"); ++ VERIFY("Verify"), + INDEX_SUMMARY("Index summary redistribution"); private final String type; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 1dd3a4e,be5cc3c..4438dc1 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@@ -26,19 -30,19 +26,20 @@@ import javax.management.MBeanServer import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; - import com.google.common.collect.*; - - import org.apache.cassandra.io.sstable.format.SSTableReader; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; ++import com.google.common.collect.ImmutableSet; + import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataTracker; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.View; + import org.apache.cassandra.db.compaction.CompactionManager; ++import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; @@@ -57,13 -59,13 +56,6 @@@ public class IndexSummaryManager implem private int resizeIntervalInMinutes = 0; private long memoryPoolBytes; -- // The target (or ideal) number of index summary entries must differ from the actual number of -- // entries by this ratio in order to trigger an upsample or downsample of the summary. Because -- // upsampling requires reading the primary index in order to rebuild the summary, the threshold -- // for upsampling is is higher. -- static final double UPSAMPLE_THRESHOLD = 1.5; -- static final double DOWNSAMPLE_THESHOLD = 0.75; -- private final DebuggableScheduledThreadPoolExecutor executor; // our next scheduled resizing run @@@ -251,267 -249,8 +243,8 @@@ * @return a list of new SSTableReader instances */ @VisibleForTesting - public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException + public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException { - logger.info("Redistributing index summaries"); - List<SSTableReader> oldFormatSSTables = new ArrayList<>(); - List<SSTableReader> redistribute = new ArrayList<>(); - for (LifecycleTransaction txn : transactions.values()) - { - for (SSTableReader sstable : ImmutableList.copyOf(txn.originals())) - { - // We can't change the sampling level of sstables with the old format, because the serialization format - // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) - logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); - if (!sstable.descriptor.version.hasSamplingLevel()) - { - oldFormatSSTables.add(sstable); - txn.cancel(sstable); - } - } - redistribute.addAll(txn.originals()); - } - - long total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, redistribute)) - total += sstable.getIndexSummaryOffHeapSize(); - - logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", - redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); - - final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size()); - double totalReadsPerSec = 0.0; - for (SSTableReader sstable : redistribute) - { - if (sstable.getReadMeter() != null) - { - Double readRate = sstable.getReadMeter().fifteenMinuteRate(); - totalReadsPerSec += readRate; - readRates.put(sstable, readRate); - } - } - logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec); - - // copy and sort by read rates (ascending) - List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute); - Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); - - long remainingBytes = memoryPoolBytes; - for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables)) - remainingBytes -= sstable.getIndexSummaryOffHeapSize(); - - logger.trace("Index summaries for compacting SSTables are using {} MB of space", - (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0); - List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes); - - for (LifecycleTransaction txn : transactions.values()) - txn.finish(); - - total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) - total += sstable.getIndexSummaryOffHeapSize(); - logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB", - total / 1024.0 / 1024.0); - - return newSSTables; - } - - private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, Map<UUID, LifecycleTransaction> transactions, - double totalReadsPerSec, long memoryPoolCapacity) throws IOException - { - - List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4); - List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4); - List<ResampleEntry> forceResample = new ArrayList<>(); - List<ResampleEntry> forceUpsample = new ArrayList<>(); - List<SSTableReader> newSSTables = new ArrayList<>(sstables.size()); - - // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional - // to the number of total reads/sec it handles. - long remainingSpace = memoryPoolCapacity; - for (SSTableReader sstable : sstables) - { - int minIndexInterval = sstable.metadata.getMinIndexInterval(); - int maxIndexInterval = sstable.metadata.getMaxIndexInterval(); - - double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate(); - long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec)); - - // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that - int currentNumEntries = sstable.getIndexSummarySize(); - double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries; - long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize)); - int currentSamplingLevel = sstable.getIndexSummarySamplingLevel(); - int maxSummarySize = sstable.getMaxIndexSummarySize(); - - // if the min_index_interval changed, calculate what our current sampling level would be under the new min - if (sstable.getMinIndexInterval() != minIndexInterval) - { - int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval())); - maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval)); - logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})", - sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel); - currentSamplingLevel = effectiveSamplingLevel; - } - - int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries, - minIndexInterval, maxIndexInterval); - int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize); - double effectiveIndexInterval = sstable.getEffectiveIndexInterval(); - - logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " + - "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)", - sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries, - currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel, - numEntriesAtNewSamplingLevel * avgEntrySize); - - if (effectiveIndexInterval < minIndexInterval) - { - // The min_index_interval was changed; re-sample to match it. - logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})", - sstable, effectiveIndexInterval, minIndexInterval); - long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); - forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); - remainingSpace -= spaceUsed; - } - else if (effectiveIndexInterval > maxIndexInterval) - { - // The max_index_interval was lowered; force an upsample to the effective minimum sampling level - logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})", - sstable, effectiveIndexInterval, maxIndexInterval); - newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval); - numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize()); - long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); - forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); - remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; - } - else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel) - { - long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); - toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); - remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; - } - else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel) - { - long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); - toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); - remainingSpace -= spaceUsed; - } - else - { - // keep the same sampling level - logger.trace("SSTable {} is within thresholds of ideal sampling", sstable); - remainingSpace -= sstable.getIndexSummaryOffHeapSize(); - newSSTables.add(sstable); - transactions.get(sstable.metadata.cfId).cancel(sstable); - } - totalReadsPerSec -= readsPerSec; - } - - if (remainingSpace > 0) - { - Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace); - toDownsample = result.right; - newSSTables.addAll(result.left); - for (SSTableReader sstable : result.left) - transactions.get(sstable.metadata.cfId).cancel(sstable); - } - - // downsample first, then upsample - toDownsample.addAll(forceResample); - toDownsample.addAll(toUpsample); - toDownsample.addAll(forceUpsample); - for (ResampleEntry entry : toDownsample) - { - SSTableReader sstable = entry.sstable; - logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", - sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, - entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); - ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId); - SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); - newSSTables.add(replacement); - transactions.get(sstable.metadata.cfId).update(replacement, true); - } - - return newSSTables; - } - - @VisibleForTesting - static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace) - { - // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations - // that will make little difference. - Collections.sort(toDownsample, new Comparator<ResampleEntry>() - { - public int compare(ResampleEntry o1, ResampleEntry o2) - { - return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed, - o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed); - } - }); - - int noDownsampleCutoff = 0; - List<SSTableReader> willNotDownsample = new ArrayList<>(); - while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size()) - { - ResampleEntry entry = toDownsample.get(noDownsampleCutoff); - - long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed; - // see if we have enough leftover space to keep the current sampling level - if (extraSpaceRequired <= remainingSpace) - { - logger.trace("Using leftover space to keep {} at the current sampling level ({})", - entry.sstable, entry.sstable.getIndexSummarySamplingLevel()); - willNotDownsample.add(entry.sstable); - remainingSpace -= extraSpaceRequired; - } - else - { - break; - } - - noDownsampleCutoff++; - } - return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size())); - } - - private static class ResampleEntry - { - public final SSTableReader sstable; - public final long newSpaceUsed; - public final int newSamplingLevel; - - public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel) - { - this.sstable = sstable; - this.newSpaceUsed = newSpaceUsed; - this.newSamplingLevel = newSamplingLevel; - } - } - - /** Utility class for sorting sstables by their read rates. */ - private static class ReadRateComparator implements Comparator<SSTableReader> - { - private final Map<SSTableReader, Double> readRates; - - public ReadRateComparator(Map<SSTableReader, Double> readRates) - { - this.readRates = readRates; - } - - @Override - public int compare(SSTableReader o1, SSTableReader o2) - { - Double readRate1 = readRates.get(o1); - Double readRate2 = readRates.get(o2); - if (readRate1 == null && readRate2 == null) - return 0; - else if (readRate1 == null) - return -1; - else if (readRate2 == null) - return 1; - else - return Double.compare(readRate1, readRate2); - } - return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, nonCompacting, memoryPoolBytes)); ++ return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes)); } - } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java index 0000000,adb3e4e..aad479b mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java @@@ -1,0 -1,338 +1,349 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.io.sstable; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Collections; + import java.util.Comparator; + import java.util.HashMap; + import java.util.List; + import java.util.Map; ++import java.util.UUID; + + import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.HashMultimap; ++import com.google.common.collect.ImmutableList; + import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataTracker; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.compaction.CompactionInfo; + import org.apache.cassandra.db.compaction.CompactionInterruptedException; + import org.apache.cassandra.db.compaction.OperationType; ++import org.apache.cassandra.db.lifecycle.LifecycleTransaction; ++import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.utils.Pair; + + import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; + + public class IndexSummaryRedistribution extends CompactionInfo.Holder + { + private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class); + ++ // The target (or ideal) number of index summary entries must differ from the actual number of ++ // entries by this ratio in order to trigger an upsample or downsample of the summary. Because ++ // upsampling requires reading the primary index in order to rebuild the summary, the threshold ++ // for upsampling is is higher. ++ static final double UPSAMPLE_THRESHOLD = 1.5; ++ static final double DOWNSAMPLE_THESHOLD = 0.75; ++ + private final List<SSTableReader> compacting; - private final List<SSTableReader> nonCompacting; ++ private final Map<UUID, LifecycleTransaction> transactions; + private final long memoryPoolBytes; ++ private final UUID compactionId; + private volatile long remainingSpace; + - public IndexSummaryRedistribution(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) ++ public IndexSummaryRedistribution(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) + { + this.compacting = compacting; - this.nonCompacting = nonCompacting; ++ this.transactions = transactions; + this.memoryPoolBytes = memoryPoolBytes; ++ this.compactionId = UUID.randomUUID(); + } + + public List<SSTableReader> redistributeSummaries() throws IOException + { - long total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting)) - total += sstable.getIndexSummaryOffHeapSize(); - ++ logger.info("Redistributing index summaries"); + List<SSTableReader> oldFormatSSTables = new ArrayList<>(); - for (SSTableReader sstable : nonCompacting) ++ List<SSTableReader> redistribute = new ArrayList<>(); ++ for (LifecycleTransaction txn : transactions.values()) + { - // We can't change the sampling level of sstables with the old format, because the serialization format - // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) - logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); - if (!sstable.descriptor.version.hasSamplingLevel) - oldFormatSSTables.add(sstable); ++ for (SSTableReader sstable : ImmutableList.copyOf(txn.originals())) ++ { ++ // We can't change the sampling level of sstables with the old format, because the serialization format ++ // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) ++ logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); ++ if (!sstable.descriptor.version.hasSamplingLevel()) ++ { ++ oldFormatSSTables.add(sstable); ++ txn.cancel(sstable); ++ } ++ } ++ redistribute.addAll(txn.originals()); + } - nonCompacting.removeAll(oldFormatSSTables); + - logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", - nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); ++ long total = 0; ++ for (SSTableReader sstable : Iterables.concat(compacting, redistribute)) ++ total += sstable.getIndexSummaryOffHeapSize(); + - final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size()); ++ logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", ++ redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); ++ ++ final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size()); + double totalReadsPerSec = 0.0; - for (SSTableReader sstable : nonCompacting) ++ for (SSTableReader sstable : redistribute) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + + if (sstable.getReadMeter() != null) + { + Double readRate = sstable.getReadMeter().fifteenMinuteRate(); + totalReadsPerSec += readRate; + readRates.put(sstable, readRate); + } + } + logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec); + + // copy and sort by read rates (ascending) - List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting); ++ List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute); + Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); + + long remainingBytes = memoryPoolBytes; + for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables)) + remainingBytes -= sstable.getIndexSummaryOffHeapSize(); + + logger.trace("Index summaries for compacting SSTables are using {} MB of space", + (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0); - List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes); ++ List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes); ++ ++ for (LifecycleTransaction txn : transactions.values()) ++ txn.finish(); + + total = 0; + for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) + total += sstable.getIndexSummaryOffHeapSize(); - logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB", ++ logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB", + total / 1024.0 / 1024.0); + + return newSSTables; + } + + private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, ++ Map<UUID, LifecycleTransaction> transactions, + double totalReadsPerSec, long memoryPoolCapacity) throws IOException + { - + List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4); + List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4); + List<ResampleEntry> forceResample = new ArrayList<>(); + List<ResampleEntry> forceUpsample = new ArrayList<>(); + List<SSTableReader> newSSTables = new ArrayList<>(sstables.size()); + + // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional + // to the number of total reads/sec it handles. + remainingSpace = memoryPoolCapacity; + for (SSTableReader sstable : sstables) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + + int minIndexInterval = sstable.metadata.getMinIndexInterval(); + int maxIndexInterval = sstable.metadata.getMaxIndexInterval(); + + double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate(); + long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec)); + + // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that + int currentNumEntries = sstable.getIndexSummarySize(); + double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries; + long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize)); + int currentSamplingLevel = sstable.getIndexSummarySamplingLevel(); + int maxSummarySize = sstable.getMaxIndexSummarySize(); + + // if the min_index_interval changed, calculate what our current sampling level would be under the new min + if (sstable.getMinIndexInterval() != minIndexInterval) + { + int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval())); + maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval)); + logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})", + sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel); + currentSamplingLevel = effectiveSamplingLevel; + } + + int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries, - minIndexInterval, maxIndexInterval); ++ minIndexInterval, maxIndexInterval); + int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize); + double effectiveIndexInterval = sstable.getEffectiveIndexInterval(); + + logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " + - "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)", - sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries, - currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel, - numEntriesAtNewSamplingLevel * avgEntrySize); ++ "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)", ++ sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries, ++ currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel, ++ numEntriesAtNewSamplingLevel * avgEntrySize); + + if (effectiveIndexInterval < minIndexInterval) + { + // The min_index_interval was changed; re-sample to match it. - logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})", - sstable, effectiveIndexInterval, minIndexInterval); ++ logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})", ++ sstable, effectiveIndexInterval, minIndexInterval); + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= spaceUsed; + } + else if (effectiveIndexInterval > maxIndexInterval) + { + // The max_index_interval was lowered; force an upsample to the effective minimum sampling level - logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})", - sstable, effectiveIndexInterval, maxIndexInterval); ++ logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})", ++ sstable, effectiveIndexInterval, maxIndexInterval); + newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval); + numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize()); + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; + } - else if (targetNumEntries >= currentNumEntries * IndexSummaryManager.UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel) ++ else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel) + { + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; + } - else if (targetNumEntries < currentNumEntries * IndexSummaryManager.DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel) ++ else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel) + { + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= spaceUsed; + } + else + { + // keep the same sampling level + logger.trace("SSTable {} is within thresholds of ideal sampling", sstable); + remainingSpace -= sstable.getIndexSummaryOffHeapSize(); + newSSTables.add(sstable); ++ transactions.get(sstable.metadata.cfId).cancel(sstable); + } + totalReadsPerSec -= readsPerSec; + } + + if (remainingSpace > 0) + { + Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace); + toDownsample = result.right; + newSSTables.addAll(result.left); ++ for (SSTableReader sstable : result.left) ++ transactions.get(sstable.metadata.cfId).cancel(sstable); + } + + // downsample first, then upsample + toDownsample.addAll(forceResample); + toDownsample.addAll(toUpsample); + toDownsample.addAll(forceUpsample); - Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create(); - Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create(); - - try ++ for (ResampleEntry entry : toDownsample) + { - for (ResampleEntry entry : toDownsample) - { - if (isStopRequested()) - throw new CompactionInterruptedException(getCompactionInfo()); - - SSTableReader sstable = entry.sstable; - logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", - sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, - entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); - ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()); - DataTracker tracker = cfs.getDataTracker(); - SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); - newSSTables.add(replacement); - replacedByTracker.put(tracker, sstable); - replacementsByTracker.put(tracker, replacement); - } - } - finally - { - for (DataTracker tracker : replacedByTracker.keySet()) - tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker)); ++ if (isStopRequested()) ++ throw new CompactionInterruptedException(getCompactionInfo()); ++ ++ SSTableReader sstable = entry.sstable; ++ logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", ++ sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, ++ entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); ++ ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId); ++ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); ++ newSSTables.add(replacement); ++ transactions.get(sstable.metadata.cfId).update(replacement, true); + } + + return newSSTables; + } + + @VisibleForTesting + static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace) + { + // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations + // that will make little difference. + Collections.sort(toDownsample, new Comparator<ResampleEntry>() + { + public int compare(ResampleEntry o1, ResampleEntry o2) + { + return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed, + o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed); + } + }); + + int noDownsampleCutoff = 0; + List<SSTableReader> willNotDownsample = new ArrayList<>(); + while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size()) + { + ResampleEntry entry = toDownsample.get(noDownsampleCutoff); + + long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed; + // see if we have enough leftover space to keep the current sampling level + if (extraSpaceRequired <= remainingSpace) + { + logger.trace("Using leftover space to keep {} at the current sampling level ({})", + entry.sstable, entry.sstable.getIndexSummarySamplingLevel()); + willNotDownsample.add(entry.sstable); + remainingSpace -= extraSpaceRequired; + } + else + { + break; + } + + noDownsampleCutoff++; + } + return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size())); + } + + public CompactionInfo getCompactionInfo() + { - return new CompactionInfo(OperationType.INDEX_SUMMARY, (remainingSpace - memoryPoolBytes), memoryPoolBytes, "bytes"); ++ return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, "bytes", compactionId); + } + + /** Utility class for sorting sstables by their read rates. */ + private static class ReadRateComparator implements Comparator<SSTableReader> + { + private final Map<SSTableReader, Double> readRates; + + ReadRateComparator(Map<SSTableReader, Double> readRates) + { + this.readRates = readRates; + } + + @Override + public int compare(SSTableReader o1, SSTableReader o2) + { + Double readRate1 = readRates.get(o1); + Double readRate2 = readRates.get(o2); + if (readRate1 == null && readRate2 == null) + return 0; + else if (readRate1 == null) + return -1; + else if (readRate2 == null) + return 1; + else + return Double.compare(readRate1, readRate2); + } + } + + private static class ResampleEntry + { + public final SSTableReader sstable; + public final long newSpaceUsed; + public final int newSamplingLevel; + + ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel) + { + this.sstable = sstable; + this.newSpaceUsed = newSpaceUsed; + this.newSamplingLevel = newSamplingLevel; + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 5e46b8e,63928e2..6935680 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@@ -21,38 -21,33 +21,41 @@@ import java.io.IOException import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; --import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicReference; - import org.apache.cassandra.io.sstable.format.SSTableReader; + import com.google.common.base.Joiner; + import com.google.common.collect.Sets; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; --import junit.framework.Assert; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.cache.CachingOptions; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; + import org.apache.cassandra.db.compaction.CompactionInfo; + import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.QueryFilter; ++import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.exceptions.ConfigurationException; ++import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.metrics.RestorableMeter; +import static com.google.common.collect.ImmutableMap.of; +import static java.util.Arrays.asList; import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; --import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD; --import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD; ++import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD; ++import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD; import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@@ -106,8 -78,13 +109,13 @@@ public class IndexSummaryManagerTes @After public void afterTest() { + for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions()) + { + holder.stop(); + } + - String ksname = "Keyspace1"; - String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching + String ksname = KEYSPACE1; + String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching Keyspace keyspace = Keyspace.open(ksname); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); cfs.metadata.minIndexInterval(originalMinIndexInterval); @@@ -581,4 -507,59 +589,65 @@@ assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval()); } } + + @Test + public void testCancelIndex() throws Exception + { - String ksname = "Keyspace1"; - String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching ++ String ksname = KEYSPACE1; ++ String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching + Keyspace keyspace = Keyspace.open(ksname); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); ++ final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + final int numSSTables = 4; ++ final int numTries = 4; + int numRows = 256; + createSSTables(ksname, cfname, numSSTables, numRows); + + final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables()); + for (SSTableReader sstable : sstables) + sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0)); + + final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize(); + + // everything should get cut in half + final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>(); ++ + Thread t = new Thread(new Runnable() + { + public void run() + { + try + { - redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables, (singleSummaryOffHeapSpace * (numSSTables / 2))); ++ // Don't leave enough space for even the minimal index summaries ++ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) ++ { ++ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace); ++ } + } + catch (CompactionInterruptedException ex) + { + exception.set(ex); + } + catch (IOException ignored) + { + } + } + }); + t.start(); + while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive()) - Thread.sleep(1); ++ Thread.yield(); + CompactionManager.instance.stopCompaction("INDEX_SUMMARY"); + t.join(); + + assertNotNull("Expected compaction interrupted exception", exception.get()); + assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty()); + + Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables); + Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables()); + Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables); + assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s", + Joiner.on(",").join(disjoint)), + disjoint.isEmpty()); + + validateData(cfs, numRows); + } }