Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65885e7f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65885e7f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65885e7f Branch: refs/heads/cassandra-3.0 Commit: 65885e7fc356c342331aec11667b5abdc28897b6 Parents: b55523e f166749 Author: Marcus Eriksson <marc...@apache.org> Authored: Fri Dec 11 17:34:21 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Fri Dec 11 17:37:41 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../cassandra/db/compaction/CompactionInfo.java | 14 +- .../db/compaction/CompactionManager.java | 15 + .../cassandra/db/compaction/OperationType.java | 3 +- .../io/sstable/IndexSummaryManager.java | 279 +-------------- .../io/sstable/IndexSummaryRedistribution.java | 349 +++++++++++++++++++ .../io/sstable/IndexSummaryManagerTest.java | 80 ++++- 7 files changed, 462 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 9c01160,5da0d42..5932dbb --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,34 -1,13 +1,37 @@@ -2.2.5 - * Fix regression in split size on CqlInputFormat (CASSANDRA-10835) - * Better handling of SSL connection errors inter-node (CASSANDRA-10816) - * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) - * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761) ++3.0.2 + Merged from 2.1: + * Allow cancellation of index summary redistribution (CASSANDRA-8805) - * Fix Stress profile parsing on Windows (CASSANDRA-10808) - -2.2.4 +3.0.1 + * Avoid MV race during node decommission (CASSANDRA-10674) + * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) + * Handle single-column deletions correction in materialized views + when the column is part of the view primary key (CASSANDRA-10796) + * Fix issue with datadir migration on upgrade (CASSANDRA-10788) + * Fix bug with range tombstones on reverse queries and test coverage for + AbstractBTreePartition (CASSANDRA-10059) + * Remove 64k limit on collection elements (CASSANDRA-10374) + * Remove unclear Indexer.indexes() method (CASSANDRA-10690) + * Fix NPE on stream read error (CASSANDRA-10771) + * Normalize cqlsh DESC output (CASSANDRA-10431) + * Rejects partition range deletions when columns are specified (CASSANDRA-10739) + * Fix error when saving cached key for old format sstable (CASSANDRA-10778) + * Invalidate prepared statements on DROP INDEX (CASSANDRA-10758) + * Fix SELECT statement with IN restrictions on partition key, + ORDER BY and LIMIT (CASSANDRA-10729) + * Improve stress performance over 1k threads (CASSANDRA-7217) + * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731) + * Unable to create a function with argument of type Inet (CASSANDRA-10741) + * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717) + * Correctly preserve deletion info on updated rows when notifying indexers + of single-row deletions (CASSANDRA-10694) + * Notify indexers of partition delete during cleanup (CASSANDRA-10685) + * Keep the file open in trySkipCache (CASSANDRA-10669) + * Updated trigger example (CASSANDRA-10257) +Merged from 2.2: + * Fix regression on split size in CqlInputFormat (CASSANDRA-10835) + * Better handling of SSL connection errors inter-node (CASSANDRA-10816) + * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761) + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) * Show CQL help in cqlsh in web browser (CASSANDRA-7225) * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775) * Reject index queries while the index is building (CASSANDRA-8505) http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 3ce7d2c,ba9c25e..bd950e3 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -50,11 -47,7 +50,12 @@@ import org.apache.cassandra.db.view.Vie import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; ++import org.apache.cassandra.io.sstable.IndexSummaryRedistribution; +import org.apache.cassandra.io.sstable.SSTableRewriter; +import org.apache.cassandra.io.sstable.SnapshotDeletingTask; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@@ -1322,7 -1345,21 +1323,21 @@@ public class CompactionManager implemen return executor.submit(runnable); } + public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException + { + metrics.beginCompaction(redistribution); + + try + { + return redistribution.redistributeSummaries(); + } + finally + { + metrics.finishCompaction(redistribution); + } + } + - public static int getDefaultGcBefore(ColumnFamilyStore cfs) + public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec) { // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to // add any GcGrace however since 2ndary indexes are local to a node. http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/OperationType.java index a69622b,6b66ded..20e6df2 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@@ -33,13 -33,9 +33,14 @@@ public enum OperationTyp UNKNOWN("Unknown compaction type"), ANTICOMPACTION("Anticompaction after repair"), VERIFY("Verify"), + FLUSH("Flush"), + STREAM("Stream"), + WRITE("Write"), - VIEW_BUILD("View build"); ++ VIEW_BUILD("View build"), + INDEX_SUMMARY("Index summary redistribution"); - private final String type; + public final String type; + public final String fileName; OperationType(String type) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index e07f297,4438dc1..aed35c9 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@@ -26,10 -26,8 +26,7 @@@ import javax.management.MBeanServer import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; +import com.google.common.collect.*; - - import org.apache.cassandra.db.lifecycle.SSTableSet; - import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -40,6 -38,8 +37,9 @@@ import org.apache.cassandra.db.compacti import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.View; ++import org.apache.cassandra.db.lifecycle.SSTableSet; + import org.apache.cassandra.db.compaction.CompactionManager; + import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java index 0000000,aad479b..b4eae31 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java @@@ -1,0 -1,349 +1,349 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.io.sstable; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Collections; + import java.util.Comparator; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.UUID; + + import com.google.common.annotations.VisibleForTesting; + import com.google.common.collect.ImmutableList; + import com.google.common.collect.Iterables; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.compaction.CompactionInfo; + import org.apache.cassandra.db.compaction.CompactionInterruptedException; + import org.apache.cassandra.db.compaction.OperationType; + import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.utils.Pair; + + import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; + + public class IndexSummaryRedistribution extends CompactionInfo.Holder + { + private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class); + + // The target (or ideal) number of index summary entries must differ from the actual number of + // entries by this ratio in order to trigger an upsample or downsample of the summary. Because + // upsampling requires reading the primary index in order to rebuild the summary, the threshold + // for upsampling is is higher. + static final double UPSAMPLE_THRESHOLD = 1.5; + static final double DOWNSAMPLE_THESHOLD = 0.75; + + private final List<SSTableReader> compacting; + private final Map<UUID, LifecycleTransaction> transactions; + private final long memoryPoolBytes; + private final UUID compactionId; + private volatile long remainingSpace; + + public IndexSummaryRedistribution(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) + { + this.compacting = compacting; + this.transactions = transactions; + this.memoryPoolBytes = memoryPoolBytes; + this.compactionId = UUID.randomUUID(); + } + + public List<SSTableReader> redistributeSummaries() throws IOException + { + logger.info("Redistributing index summaries"); + List<SSTableReader> oldFormatSSTables = new ArrayList<>(); + List<SSTableReader> redistribute = new ArrayList<>(); + for (LifecycleTransaction txn : transactions.values()) + { + for (SSTableReader sstable : ImmutableList.copyOf(txn.originals())) + { + // We can't change the sampling level of sstables with the old format, because the serialization format + // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) + logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); + if (!sstable.descriptor.version.hasSamplingLevel()) + { + oldFormatSSTables.add(sstable); + txn.cancel(sstable); + } + } + redistribute.addAll(txn.originals()); + } + + long total = 0; + for (SSTableReader sstable : Iterables.concat(compacting, redistribute)) + total += sstable.getIndexSummaryOffHeapSize(); + + logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", + redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); + + final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size()); + double totalReadsPerSec = 0.0; + for (SSTableReader sstable : redistribute) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + + if (sstable.getReadMeter() != null) + { + Double readRate = sstable.getReadMeter().fifteenMinuteRate(); + totalReadsPerSec += readRate; + readRates.put(sstable, readRate); + } + } + logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec); + + // copy and sort by read rates (ascending) + List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute); + Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); + + long remainingBytes = memoryPoolBytes; + for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables)) + remainingBytes -= sstable.getIndexSummaryOffHeapSize(); + + logger.trace("Index summaries for compacting SSTables are using {} MB of space", + (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0); + List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes); + + for (LifecycleTransaction txn : transactions.values()) + txn.finish(); + + total = 0; + for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) + total += sstable.getIndexSummaryOffHeapSize(); + logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB", + total / 1024.0 / 1024.0); + + return newSSTables; + } + + private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, + Map<UUID, LifecycleTransaction> transactions, + double totalReadsPerSec, long memoryPoolCapacity) throws IOException + { + List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4); + List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4); + List<ResampleEntry> forceResample = new ArrayList<>(); + List<ResampleEntry> forceUpsample = new ArrayList<>(); + List<SSTableReader> newSSTables = new ArrayList<>(sstables.size()); + + // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional + // to the number of total reads/sec it handles. + remainingSpace = memoryPoolCapacity; + for (SSTableReader sstable : sstables) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + - int minIndexInterval = sstable.metadata.getMinIndexInterval(); - int maxIndexInterval = sstable.metadata.getMaxIndexInterval(); ++ int minIndexInterval = sstable.metadata.params.minIndexInterval; ++ int maxIndexInterval = sstable.metadata.params.maxIndexInterval; + + double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate(); + long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec)); + + // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that + int currentNumEntries = sstable.getIndexSummarySize(); + double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries; + long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize)); + int currentSamplingLevel = sstable.getIndexSummarySamplingLevel(); + int maxSummarySize = sstable.getMaxIndexSummarySize(); + + // if the min_index_interval changed, calculate what our current sampling level would be under the new min + if (sstable.getMinIndexInterval() != minIndexInterval) + { + int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval())); + maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval)); + logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})", + sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel); + currentSamplingLevel = effectiveSamplingLevel; + } + + int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries, + minIndexInterval, maxIndexInterval); + int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize); + double effectiveIndexInterval = sstable.getEffectiveIndexInterval(); + + logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " + + "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)", + sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries, + currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel, + numEntriesAtNewSamplingLevel * avgEntrySize); + + if (effectiveIndexInterval < minIndexInterval) + { + // The min_index_interval was changed; re-sample to match it. + logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})", + sstable, effectiveIndexInterval, minIndexInterval); + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= spaceUsed; + } + else if (effectiveIndexInterval > maxIndexInterval) + { + // The max_index_interval was lowered; force an upsample to the effective minimum sampling level + logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})", + sstable, effectiveIndexInterval, maxIndexInterval); + newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval); + numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize()); + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; + } + else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel) + { + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; + } + else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel) + { + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= spaceUsed; + } + else + { + // keep the same sampling level + logger.trace("SSTable {} is within thresholds of ideal sampling", sstable); + remainingSpace -= sstable.getIndexSummaryOffHeapSize(); + newSSTables.add(sstable); + transactions.get(sstable.metadata.cfId).cancel(sstable); + } + totalReadsPerSec -= readsPerSec; + } + + if (remainingSpace > 0) + { + Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace); + toDownsample = result.right; + newSSTables.addAll(result.left); + for (SSTableReader sstable : result.left) + transactions.get(sstable.metadata.cfId).cancel(sstable); + } + + // downsample first, then upsample + toDownsample.addAll(forceResample); + toDownsample.addAll(toUpsample); + toDownsample.addAll(forceUpsample); + for (ResampleEntry entry : toDownsample) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + + SSTableReader sstable = entry.sstable; + logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", + sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, + entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); + ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId); + SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); + newSSTables.add(replacement); + transactions.get(sstable.metadata.cfId).update(replacement, true); + } + + return newSSTables; + } + + @VisibleForTesting + static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace) + { + // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations + // that will make little difference. + Collections.sort(toDownsample, new Comparator<ResampleEntry>() + { + public int compare(ResampleEntry o1, ResampleEntry o2) + { + return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed, + o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed); + } + }); + + int noDownsampleCutoff = 0; + List<SSTableReader> willNotDownsample = new ArrayList<>(); + while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size()) + { + ResampleEntry entry = toDownsample.get(noDownsampleCutoff); + + long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed; + // see if we have enough leftover space to keep the current sampling level + if (extraSpaceRequired <= remainingSpace) + { + logger.trace("Using leftover space to keep {} at the current sampling level ({})", + entry.sstable, entry.sstable.getIndexSummarySamplingLevel()); + willNotDownsample.add(entry.sstable); + remainingSpace -= extraSpaceRequired; + } + else + { + break; + } + + noDownsampleCutoff++; + } + return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size())); + } + + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, "bytes", compactionId); + } + + /** Utility class for sorting sstables by their read rates. */ + private static class ReadRateComparator implements Comparator<SSTableReader> + { + private final Map<SSTableReader, Double> readRates; + + ReadRateComparator(Map<SSTableReader, Double> readRates) + { + this.readRates = readRates; + } + + @Override + public int compare(SSTableReader o1, SSTableReader o2) + { + Double readRate1 = readRates.get(o1); + Double readRate2 = readRates.get(o2); + if (readRate1 == null && readRate2 == null) + return 0; + else if (readRate1 == null) + return -1; + else if (readRate2 == null) + return 1; + else + return Double.compare(readRate1, readRate2); + } + } + + private static class ResampleEntry + { + public final SSTableReader sstable; + public final long newSpaceUsed; + public final int newSamplingLevel; + + ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel) + { + this.sstable = sstable; + this.newSpaceUsed = newSpaceUsed; + this.newSamplingLevel = newSamplingLevel; + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 5493edb,6935680..0498c68 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@@ -19,15 -19,12 +19,20 @@@ package org.apache.cassandra.io.sstable import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; ++import java.util.HashSet; +import java.util.List; +import java.util.Map; ++import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicReference; + import com.google.common.base.Joiner; + import com.google.common.collect.Sets; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@@ -39,18 -36,20 +44,22 @@@ import org.slf4j.LoggerFactory import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.cache.CachingOptions; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.db.compaction.CompactionInfo; + import org.apache.cassandra.db.compaction.CompactionInterruptedException; + import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.filter.QueryFilter; + import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.locator.SimpleStrategy; + import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.schema.CachingParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; import static com.google.common.collect.ImmutableMap.of; import static java.util.Arrays.asList; @@@ -587,7 -586,68 +601,67 @@@ public class IndexSummaryManagerTes for (Map.Entry<String, Integer> entry : intervals.entrySet()) { if (entry.getKey().contains(CF_STANDARDLOWiINTERVAL)) - assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval()); + assertTrue(entry.getValue() >= cfs.metadata.params.minIndexInterval); } } + + @Test + public void testCancelIndex() throws Exception + { + String ksname = KEYSPACE1; + String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching + Keyspace keyspace = Keyspace.open(ksname); + final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + final int numSSTables = 4; - final int numTries = 4; + int numRows = 256; + createSSTables(ksname, cfname, numSSTables, numRows); + - final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables()); ++ final List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + for (SSTableReader sstable : sstables) + sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0)); + + final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize(); + + // everything should get cut in half + final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>(); + + Thread t = new Thread(new Runnable() + { + public void run() + { + try + { + // Don't leave enough space for even the minimal index summaries + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace); + } + } + catch (CompactionInterruptedException ex) + { + exception.set(ex); + } + catch (IOException ignored) + { + } + } + }); + t.start(); + while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive()) - Thread.yield(); ++ Thread.sleep(1); + CompactionManager.instance.stopCompaction("INDEX_SUMMARY"); + t.join(); + + assertNotNull("Expected compaction interrupted exception", exception.get()); + assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty()); + + Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables); - Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables()); ++ Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getLiveSSTables()); + Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables); + assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s", + Joiner.on(",").join(disjoint)), + disjoint.isEmpty()); + + validateData(cfs, numRows); + } }