This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f9b46aeb3123e3d387837aa8525126b0a450851b
Merge: f6c1dea 4b547f1
Author: Marcus Eriksson <[email protected]>
AuthorDate: Thu Oct 31 14:34:06 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |  5 ++
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 43 +++++++++++--
 .../cassandra/db/compaction/CompactionInfo.java    |  8 ++-
 .../db/compaction/CompactionIterator.java          |  5 ++
 .../cassandra/db/compaction/CompactionManager.java | 26 ++++++++
 .../cassandra/db/compaction/CompactionTask.java    | 10 +--
 .../apache/cassandra/db/compaction/Scrubber.java   |  5 ++
 .../apache/cassandra/db/compaction/Verifier.java   |  5 ++
 .../apache/cassandra/db/view/ViewBuilderTask.java  |  5 ++
 .../cassandra/index/SecondaryIndexBuilder.java     |  5 ++
 .../cassandra/io/sstable/IndexSummaryManager.java  |  2 +
 .../io/sstable/IndexSummaryRedistribution.java     |  6 ++
 .../db/compaction/CancelCompactionsTest.java       | 72 +++++++++++++++++++++-
 .../db/repair/PendingAntiCompactionTest.java       | 15 +++++
 .../io/sstable/IndexSummaryManagerTest.java        | 46 +++++++++++++-
 16 files changed, 246 insertions(+), 13 deletions(-)

diff --cc CHANGES.txt
index f7bb517,4240841..201ab60
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -2,400 -5,9 +2,401 @@@
  Merged from 2.2:
   * In-JVM DTest: Set correct internode message version for upgrade test 
(CASSANDRA-15371)
  
 +4.0-alpha2
 + * Fix SASI non-literal string comparisons (range operators) (CASSANDRA-15169)
 + * Upgrade Guava to 27, and to java-driver 3.6.0 (from 3.4.0-SNAPSHOT) 
(CASSANDRA-14655)
 + * Extract an AbstractCompactionController to allow for custom 
implementations (CASSANDRA-15286)
 + * Move chronicle-core version from snapshot to stable, and include 
carrotsearch in generated pom.xml (CASSANDRA-15321)
 + * Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) 
correctly (CASSANDRA-15163)
 + * Add `allocate_tokens_for_local_replication_factor` option for token 
allocation (CASSANDRA-15260)
 + * Add Alibaba Cloud Platform snitch (CASSANDRA-15092)
 +Merged from 3.0:
++ * Make sure index summary redistribution does not start when compactions are 
paused (CASSANDRA-15265)
 + * Add ability to cap max negotiable protocol version (CASSANDRA-15193)
 + * Gossip tokens on startup if available (CASSANDRA-15335)
 + * Fix resource leak in CompressedSequentialWriter (CASSANDRA-15340)
 + * Fix bad merge that reverted CASSANDRA-14993 (CASSANDRA-15289)
 + * Add support for network topology and query tracing for inJVM dtest 
(CASSANDRA-15319)
 +
 +
 +4.0-alpha1
 + * Inaccurate exception message with nodetool snapshot (CASSANDRA-15287)
 + * Fix InternodeOutboundMetrics overloaded bytes/count mixup (CASSANDRA-15186)
 + * Enhance & reenable RepairTest with compression=off and compression=on 
(CASSANDRA-15272)
 + * Improve readability of Table metrics Virtual tables units (CASSANDRA-15194)
 + * Fix error with non-existent table for nodetool tablehistograms 
(CASSANDRA-14410)
 + * Avoid result truncation in decimal operations (CASSANDRA-15232)
 + * Catch non-IOException in FileUtils.close to make sure that all resources 
are closed (CASSANDRA-15225)
 + * Align load column in nodetool status output (CASSANDRA-14787)
 + * CassandraNetworkAuthorizer uses cached roles info (CASSANDRA-15089)
 + * Introduce optional timeouts for idle client sessions (CASSANDRA-11097)
 + * Fix AlterTableStatement dropped type validation order (CASSANDRA-15203)
 + * Update Netty dependencies to latest, clean up SocketFactory 
(CASSANDRA-15195)
 + * Native Transport - Apply noSpamLogger to ConnectionLimitHandler 
(CASSANDRA-15167)
 + * Reduce heap pressure during compactions (CASSANDRA-14654)
 + * Support building Cassandra with JDK 11 (CASSANDRA-15108)
 + * Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710)
 + * Take sstable references before calculating approximate key count 
(CASSANDRA-14647)
 + * Restore snapshotting of system keyspaces on version change 
(CASSANDRA-14412)
 + * Fix AbstractBTreePartition locking in java 11 (CASSANDRA-14607)
 + * SimpleClient should pass connection properties as options (CASSANDRA-15056)
 + * Set repaired data tracking flag on range reads if enabled (CASSANDRA-15019)
 + * Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802)
 + * Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033)
 + * Add Zstd compressor (CASSANDRA-14482)
 + * Fix IR prepare anti-compaction race (CASSANDRA-15027)
 + * Fix SimpleStrategy option validation (CASSANDRA-15007)
 + * Don't try to cancel 2i compactions when starting anticompaction 
(CASSANDRA-15024)
 + * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025)
 + * SSL Cert Hot Reloading should check for sanity of the new 
keystore/truststore before loading it (CASSANDRA-14991)
 + * Avoid leaking threads when failing anticompactions and rate limit 
anticompactions (CASSANDRA-15002)
 + * Validate token() arguments early instead of throwing NPE at execution 
(CASSANDRA-14989)
 + * Add a new tool to dump audit logs (CASSANDRA-14885)
 + * Fix generating javadoc with Java11 (CASSANDRA-14988)
 + * Only cancel conflicting compactions when starting anticompactions and sub 
range compactions (CASSANDRA-14935)
 + * Use a stub IndexRegistry for non-daemon use cases (CASSANDRA-14938)
 + * Don't enable client transports when bootstrap is pending (CASSANDRA-14525)
 + * Make antiCompactGroup throw exception on error and anticompaction non 
cancellable
 +   again (CASSANDRA-14936)
 + * Catch empty/invalid bounds in SelectStatement (CASSANDRA-14849)
 + * Auto-expand replication_factor for NetworkTopologyStrategy 
(CASSANDRA-14303)
 + * Transient Replication: support EACH_QUORUM (CASSANDRA-14727)
 + * BufferPool: allocating thread for new chunks should acquire directly 
(CASSANDRA-14832)
 + * Send correct messaging version in internode messaging handshake's third 
message (CASSANDRA-14896)
 + * Make Read and Write Latency columns consistent for proxyhistograms and 
tablehistograms (CASSANDRA-11939)
 + * Make protocol checksum type option case insensitive (CASSANDRA-14716)
 + * Forbid re-adding static columns as regular and vice versa (CASSANDRA-14913)
 + * Audit log allows system keyspaces to be audited via configuration options 
(CASSANDRA-14498)
 + * Lower default chunk_length_in_kb from 64kb to 16kb (CASSANDRA-13241)
 + * Startup checker should wait for count rather than percentage 
(CASSANDRA-14297)
 + * Fix incorrect sorting of replicas in 
SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862)
 + * Partitioned outbound internode TCP connections can occur when nodes 
restart (CASSANDRA-14358)
 + * Don't write to system_distributed.repair_history, system_traces.sessions, 
system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841)
 + * Avoid running query to self through messaging service (CASSANDRA-14807)
 + * Allow using custom script for chronicle queue BinLog archival 
(CASSANDRA-14373)
 + * Transient->Full range movements mishandle consistency level upgrade 
(CASSANDRA-14759)
 + * ReplicaCollection follow-up (CASSANDRA-14726)
 + * Transient node receives full data requests (CASSANDRA-14762)
 + * Enable snapshot artifacts publish (CASSANDRA-12704)
 + * Introduce RangesAtEndpoint.unwrap to simplify 
StreamSession.addTransferRanges (CASSANDRA-14770)
 + * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout 
instead of Unavailable (CASSANDRA-14735)
 + * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
 + * Fail incremental repair prepare phase if it encounters sstables from 
un-finalized sessions (CASSANDRA-14763)
 + * Add a check for receiving digest response from transient node 
(CASSANDRA-14750)
 + * Fail query on transient replica if coordinator only expects full data 
(CASSANDRA-14704)
 + * Remove mentions of transient replication from repair path (CASSANDRA-14698)
 + * Fix handleRepairStatusChangedNotification to remove first then add 
(CASSANDRA-14720)
 + * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
 + * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot 
returns wrong value for size() and incorrectly calculates count 
(CASSANDRA-14696)
 + * AbstractReplicaCollection equals and hash code should throw due to 
conflict between order sensitive/insensitive uses (CASSANDRA-14700)
 + * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
 + * Add checksumming to the native protocol (CASSANDRA-13304)
 + * Make AuthCache more easily extendable (CASSANDRA-14662)
 + * Extend RolesCache to include detailed role info (CASSANDRA-14497)
 + * Add fqltool compare (CASSANDRA-14619)
 + * Add fqltool replay (CASSANDRA-14618)
 + * Log keyspace in full query log (CASSANDRA-14656)
 + * Transient Replication and Cheap Quorums (CASSANDRA-14404)
 + * Log server-generated timestamp and nowInSeconds used by queries in FQL 
(CASSANDRA-14675)
 + * Add diagnostic events for read repairs (CASSANDRA-14668)
 + * Use consistent nowInSeconds and timestamps values within a request 
(CASSANDRA-14671)
 + * Add sampler for query time and expose with nodetool (CASSANDRA-14436)
 + * Clean up Message.Request implementations (CASSANDRA-14677)
 + * Disable old native protocol versions on demand (CASANDRA-14659)
 + * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664)
 + * Improve BTree build performance by avoiding data copy (CASSANDRA-9989)
 + * Make monotonic read / read repair configurable (CASSANDRA-14635)
 + * Refactor CompactionStrategyManager (CASSANDRA-14621)
 + * Flush netty client messages immediately by default (CASSANDRA-13651)
 + * Improve read repair blocking behavior (CASSANDRA-10726)
 + * Add a virtual table to expose settings (CASSANDRA-14573)
 + * Fix up chunk cache handling of metrics (CASSANDRA-14628)
 + * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652)
 + * Incomplete handling of exceptions when decoding incoming messages 
(CASSANDRA-14574)
 + * Add diagnostic events for user audit logging (CASSANDRA-13668)
 + * Allow retrieving diagnostic events via JMX (CASSANDRA-14435)
 + * Add base classes for diagnostic events (CASSANDRA-13457)
 + * Clear view system metadata when dropping keyspace (CASSANDRA-14646)
 + * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase 
(CASSANDRA-14637)
 + * Make all existing virtual tables use LocalPartitioner (CASSANDRA-14640)
 + * Revert 4.0 GC alg back to CMS (CASANDRA-14636)
 + * Remove hardcoded java11 jvm args in idea workspace files (CASSANDRA-14627)
 + * Update netty to 4.1.128 (CASSANDRA-14633)
 + * Add a virtual table to expose thread pools (CASSANDRA-14523)
 + * Add a virtual table to expose caches (CASSANDRA-14538, CASSANDRA-14626)
 + * Fix toDate function for timestamp arguments (CASSANDRA-14502)
 + * Revert running dtests by default in circleci (CASSANDRA-14614)
 + * Stream entire SSTables when possible (CASSANDRA-14556)
 + * Cell reconciliation should not depend on nowInSec (CASSANDRA-14592)
 + * Add experimental support for Java 11 (CASSANDRA-9608)
 + * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable 
(CASSANDRA-14580)
 + * Improve logging in MessageInHandler's constructor (CASSANDRA-14576)
 + * Set broadcast address in internode messaging handshake (CASSANDRA-14579)
 + * Wait for schema agreement prior to building MVs (CASSANDRA-14571)
 + * Make all DDL statements idempotent and not dependent on global state 
(CASSANDRA-13426)
 + * Bump the hints messaging version to match the current one (CASSANDRA-14536)
 + * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind 
errors in CircleCI (CASSANDRA-14546)
 + * Report why native_transport_port fails to bind (CASSANDRA-14544)
 + * Optimize internode messaging protocol (CASSANDRA-14485)
 + * Internode messaging handshake sends wrong messaging version number 
(CASSANDRA-14540)
 + * Add a virtual table to expose active client connections (CASSANDRA-14458)
 + * Clean up and refactor client metrics (CASSANDRA-14524)
 + * Nodetool import row cache invalidation races with adding sstables to 
tracker (CASSANDRA-14529)
 + * Fix assertions in LWTs after TableMetadata was made immutable 
(CASSANDRA-14356)
 + * Abort compactions quicker (CASSANDRA-14397)
 + * Support light-weight transactions in cassandra-stress (CASSANDRA-13529)
 + * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509)
 + * Add option to sanity check tombstones on reads/compactions 
(CASSANDRA-14467)
 + * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)
 + * Let nodetool import take a list of directories (CASSANDRA-14442)
 + * Avoid unneeded memory allocations / cpu for disabled log levels 
(CASSANDRA-14488)
 + * Implement virtual keyspace interface (CASSANDRA-7622)
 + * nodetool import cleanup and improvements (CASSANDRA-14417)
 + * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
 + * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
 + * Audit logging for database activity (CASSANDRA-12151)
 + * Clean up build artifacts in docs container (CASSANDRA-14432)
 + * Minor network authz improvements (Cassandra-14413)
 + * Automatic sstable upgrades (CASSANDRA-14197)
 + * Replace deprecated junit.framework.Assert usages with org.junit.Assert 
(CASSANDRA-14431)
 + * Cassandra-stress throws NPE if insert section isn't specified in user 
profile (CASSSANDRA-14426)
 + * List clients by protocol versions `nodetool clientstats --by-protocol` 
(CASSANDRA-14335)
 + * Improve LatencyMetrics performance by reducing write path processing 
(CASSANDRA-14281)
 + * Add network authz (CASSANDRA-13985)
 + * Use the correct IP/Port for Streaming when localAddress is left unbound 
(CASSANDRA-14389)
 + * nodetool listsnapshots is missing local system keyspace snapshots 
(CASSANDRA-14381)
 + * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
 + * Rename nodetool --with-port to --print-port to disambiguate from --port 
(CASSANDRA-14392)
 + * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398)
 + * Add ability to load new SSTables from a separate directory (CASSANDRA-6719)
 + * Eliminate background repair and probablistic read_repair_chance table 
options
 +   (CASSANDRA-13910)
 + * Bind to correct local address in 4.0 streaming (CASSANDRA-14362)
 + * Use standard Amazon naming for datacenter and rack in Ec2Snitch 
(CASSANDRA-7839)
 + * Fix junit failure for SSTableReaderTest (CASSANDRA-14387)
 + * Abstract write path for pluggable storage (CASSANDRA-14118)
 + * nodetool describecluster should be more informative (CASSANDRA-13853)
 + * Compaction performance improvements (CASSANDRA-14261) 
 + * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260)
 + * Add options to nodetool tablestats to sort and limit output 
(CASSANDRA-13889)
 + * Rename internals to reflect CQL vocabulary (CASSANDRA-14354)
 + * Add support for hybrid MIN(), MAX() speculative retry policies
 +   (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352)
 + * Fix some regressions caused by 14058 (CASSANDRA-14353)
 + * Abstract repair for pluggable storage (CASSANDRA-14116)
 + * Add meaningful toString() impls (CASSANDRA-13653)
 + * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
 + * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)
 + * Add coordinator write metric per CF (CASSANDRA-14232)
 + * Correct and clarify SSLFactory.getSslContext method and call sites 
(CASSANDRA-14314)
 + * Handle static and partition deletion properly on 
ThrottledUnfilteredIterator (CASSANDRA-14315)
 + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322)
 + * Add ability to specify driver name and version (CASSANDRA-14275)
 + * Abstract streaming for pluggable storage (CASSANDRA-14115)
 + * Forced incremental repairs should promote sstables if they can 
(CASSANDRA-14294)
 + * Use Murmur3 for validation compactions (CASSANDRA-14002)
 + * Comma at the end of the seed list is interpretated as localhost 
(CASSANDRA-14285)
 + * Refactor read executor and response resolver, abstract read repair 
(CASSANDRA-14058)
 + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
 + * Add a few options to nodetool verify (CASSANDRA-14201)
 + * CVE-2017-5929 Security vulnerability and redefine default log rotation 
policy (CASSANDRA-14183)
 + * Use JVM default SSL validation algorithm instead of custom default 
(CASSANDRA-13259)
 + * Better document in code InetAddressAndPort usage post 7544, incorporate 
port into UUIDGen node (CASSANDRA-14226)
 + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132)
 + * Make it possible to change neverPurgeTombstones during runtime 
(CASSANDRA-14214)
 + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174)
 + * Add nodetool clientlist (CASSANDRA-13665)
 + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211)
 + * Non-disruptive seed node list reload (CASSANDRA-14190)
 + * Nodetool tablehistograms to print statics for all the tables 
(CASSANDRA-14185)
 + * Migrate dtests to use pytest and python3 (CASSANDRA-14134)
 + * Allow storage port to be configurable per node (CASSANDRA-7544)
 + * Make sub-range selection for non-frozen collections return null instead of 
empty (CASSANDRA-14182)
 + * BloomFilter serialization format should not change byte ordering 
(CASSANDRA-9067)
 + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152)
 + * Delete temp test files on exit (CASSANDRA-14153)
 + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
 + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
 + * Fix cassandra-stress startup failure (CASSANDRA-14106)
 + * Remove initialDirectories from CFS (CASSANDRA-13928)
 + * Fix trivial log format error (CASSANDRA-14015)
 + * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
 + * Add option to optimise merkle tree comparison across replicas 
(CASSANDRA-3200)
 + * Remove unused and deprecated methods from AbstractCompactionStrategy 
(CASSANDRA-14081)
 + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
 + * Support a means of logging all queries as they were invoked 
(CASSANDRA-13983)
 + * Presize collections (CASSANDRA-13760)
 + * Add GroupCommitLogService (CASSANDRA-13530)
 + * Parallelize initial materialized view build (CASSANDRA-12245)
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt 
(CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild 
(CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes 
(CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation 
(CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming 
(CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf 
(CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times 
(CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests 
(CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM 
(CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 
(CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair 
(CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization 
(CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps 
(CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception 
(CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds 
(CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes 
(CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs 
(CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables 
(CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new 
Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped 
(CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache 
(CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config 
(CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from 
(CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause 
(CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool 
setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling 
(CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction 
(CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache 
plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads 
(CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming 
(CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions 
(CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times 
(CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order 
(CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string 
(CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction 
tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements 
(CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples 
(CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from 
actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID 
functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers 
(CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService 
(CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files 
(CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair 
(CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace 
(CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter 
(CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication 
factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection 
(CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes 
(CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options 
(CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the 
repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 + * nodetool clearsnapshot requires --all to clear all snapshots 
(CASSANDRA-13391)
 + * Correctly count range tombstones in traces and tombstone thresholds 
(CASSANDRA-8527)
 + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243)
 + * Multi-version in-JVM dtests (CASSANDRA-14937)
 + * Allow instance class loaders to be garbage collected for inJVM dtest 
(CASSANDRA-15170)
 +
  
  3.11.5
 - * Fix SASI non-literal string comparisons (range operators) (CASSANDRA-15169)
   * Make sure user defined compaction transactions are always closed 
(CASSANDRA-15123)
   * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config 
(CASSANDRA-14305)
   * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 326c005,fa00e5b..61129bb
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -2194,29 -2259,25 +2195,28 @@@ public class ColumnFamilyStore implemen
          // and so we only run one major compaction at a time
          synchronized (this)
          {
 -            logger.trace("Cancelling in-progress compactions for {}", 
metadata.cfName);
 +            logger.trace("Cancelling in-progress compactions for {}", 
metadata.name);
 +            Iterable<ColumnFamilyStore> toInterruptFor = interruptIndexes
 +                                                         ? concatWithIndexes()
 +                                                         : 
Collections.singleton(this);
  
 -            Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
 -                                                               ? 
Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
 -                                                               : 
concatWithIndexes();
 +            toInterruptFor = interruptViews
 +                             ? Iterables.concat(toInterruptFor, 
viewManager.allViewsCfs())
 +                             : toInterruptFor;
  
-             for (ColumnFamilyStore cfs : toInterruptFor)
-                 cfs.getCompactionStrategyManager().pause();
-             try
+             try (CompactionManager.CompactionPauser pause = 
CompactionManager.instance.pauseGlobalCompaction();
 -                 CompactionManager.CompactionPauser pausedStrategies = 
pauseCompactionStrategies(selfWithAuxiliaryCfs))
++                 CompactionManager.CompactionPauser pausedStrategies = 
pauseCompactionStrategies(toInterruptFor))
              {
                  // interrupt in-progress compactions
 -                
CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, 
interruptValidation);
 -                
CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs);
 +                
CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, 
sstablesPredicate, interruptValidation);
 +                CompactionManager.instance.waitForCessation(toInterruptFor, 
sstablesPredicate);
  
                  // doublecheck that we finished, instead of timing out
 -                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
 +                for (ColumnFamilyStore cfs : toInterruptFor)
                  {
 -                    if (!cfs.getTracker().getCompacting().isEmpty())
 +                    if 
(cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
                      {
 -                        logger.warn("Unable to cancel in-progress compactions 
for {}.  Perhaps there is an unusually large row in progress somewhere, or the 
system is simply overloaded.", metadata.cfName);
 +                        logger.warn("Unable to cancel in-progress compactions 
for {}.  Perhaps there is an unusually large row in progress somewhere, or the 
system is simply overloaded.", metadata.name);
                          return null;
                      }
                  }
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 09bed74,a6dbd9d..275ce70
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@@ -184,32 -165,15 +184,38 @@@ public final class CompactionInf
              stopRequested = true;
          }
  
+         /**
+          * if this compaction involves several/all tables we can safely check 
globalCompactionsPaused
+          * in isStopRequested() below
+          */
+         public abstract boolean isGlobal();
+ 
          public boolean isStopRequested()
          {
-             return stopRequested;
+             return stopRequested || (isGlobal() && 
CompactionManager.instance.isGlobalCompactionPaused());
          }
      }
 +
 +    public enum Unit
 +    {
 +        BYTES("bytes"), RANGES("token range parts"), KEYS("keys");
 +
 +        private final String name;
 +
 +        Unit(String name)
 +        {
 +            this.name = name;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return this.name;
 +        }
 +
 +        public static boolean isFileSize(String unit)
 +        {
 +            return BYTES.toString().equals(unit);
 +        }
 +    }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 789d1ee,9aba938..1128108
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -120,10 -124,14 +120,15 @@@ public class CompactionIterator extend
                                    type,
                                    bytesRead,
                                    totalBytes,
 -                                  compactionId);
 +                                  compactionId,
 +                                  sstables);
      }
  
+     public boolean isGlobal()
+     {
+         return false;
+     }
+ 
      private void updateCounterFor(int rows)
      {
          assert rows > 0 && rows - 1 < mergeCounters.length;
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bb1d585,a08d08b..cfca7d9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -131,15 -118,11 +131,18 @@@ public class CompactionManager implemen
      @VisibleForTesting
      final Multiset<ColumnFamilyStore> compactingCF = 
ConcurrentHashMultiset.create();
  
 +    public final ActiveCompactions active = new ActiveCompactions();
 +
+     // used to temporarily pause non-strategy managed compactions (like index 
summary redistribution)
+     private final AtomicInteger globalCompactionPauseCount = new 
AtomicInteger(0);
+ 
      private final RateLimiter compactionRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
  
 +    public CompactionMetrics getMetrics()
 +    {
 +        return metrics;
 +    }
 +
      /**
       * Gets compaction rate limiter.
       * Rate unit is bytes per sec.
@@@ -2161,14 -2136,25 +2164,37 @@@
          }
      }
  
++
 +    public List<CompactionInfo> getSSTableTasks()
 +    {
 +        return active.getCompactions()
 +                     .stream()
 +                     .map(CompactionInfo.Holder::getCompactionInfo)
 +                     .filter(task -> task.getTaskType() != 
OperationType.COUNTER_CACHE_SAVE
 +                                     && task.getTaskType() != 
OperationType.KEY_CACHE_SAVE
 +                                     && task.getTaskType() != 
OperationType.ROW_CACHE_SAVE)
 +                     .collect(Collectors.toList());
 +    }
++
+     /**
+      * Return whether "global" compactions should be paused, used by 
ColumnFamilyStore#runWithCompactionsDisabled
+      *
+      * a global compaction is one that includes several/all tables, currently 
only IndexSummaryBuilder
+      */
+     public boolean isGlobalCompactionPaused()
+     {
+         return globalCompactionPauseCount.get() > 0;
+     }
+ 
+     public CompactionPauser pauseGlobalCompaction()
+     {
+         CompactionPauser pauser = globalCompactionPauseCount::decrementAndGet;
+         globalCompactionPauseCount.incrementAndGet();
+         return pauser;
+     }
+ 
+     public interface CompactionPauser extends AutoCloseable
+     {
+         public void close();
+     }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 685da2e,2efcd11..725f04d
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -184,13 -184,17 +184,15 @@@ public class CompactionTask extends Abs
  
                  long lastBytesScanned = 0;
  
-                 if (!controller.cfs.getCompactionStrategyManager().isActive())
-                     throw new 
CompactionInterruptedException(ci.getCompactionInfo());
 -                if (collector != null)
 -                    collector.beginCompaction(ci);
--
 +                activeCompactions.beginCompaction(ci);
- 
                  try (CompactionAwareWriter writer = 
getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
                  {
+                     // Note that we need to re-check this flag after calling 
beginCompaction above to avoid a window
+                     // where the compaction does not exist in 
activeCompactions but the CSM gets paused.
+                     // We already have the sstables marked compacting here so 
CompactionManager#waitForCessation will
+                     // block until the below exception is thrown and the 
transaction is cancelled.
+                     if 
(!controller.cfs.getCompactionStrategyManager().isActive())
+                         throw new 
CompactionInterruptedException(ci.getCompactionInfo());
                      estimatedKeys = writer.estimatedKeys();
                      while (ci.hasNext())
                      {
diff --cc src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
index d041b48,0000000..c84c697
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
@@@ -1,269 -1,0 +1,274 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.view;
 +
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.UUID;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Function;
 +import com.google.common.base.Objects;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.PeekingIterator;
 +import com.google.common.util.concurrent.Futures;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.ReadExecutionController;
 +import org.apache.cassandra.db.ReadQuery;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.compaction.CompactionInfo;
 +import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
 +import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.Rows;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +public class ViewBuilderTask extends CompactionInfo.Holder implements 
Callable<Long>
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(ViewBuilderTask.class);
 +
 +    private static final int ROWS_BETWEEN_CHECKPOINTS = 1000;
 +
 +    private final ColumnFamilyStore baseCfs;
 +    private final View view;
 +    private final Range<Token> range;
 +    private final UUID compactionId;
 +    private volatile Token prevToken;
 +    private volatile long keysBuilt = 0;
 +    private volatile boolean isStopped = false;
 +    private volatile boolean isCompactionInterrupted = false;
 +
 +    @VisibleForTesting
 +    public ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> 
range, Token lastToken, long keysBuilt)
 +    {
 +        this.baseCfs = baseCfs;
 +        this.view = view;
 +        this.range = range;
 +        this.compactionId = UUIDGen.getTimeUUID();
 +        this.prevToken = lastToken;
 +        this.keysBuilt = keysBuilt;
 +    }
 +
 +    @SuppressWarnings("resource")
 +    private void buildKey(DecoratedKey key)
 +    {
 +        ReadQuery selectQuery = view.getReadQuery();
 +
 +        if (!selectQuery.selectsKey(key))
 +        {
 +            logger.trace("Skipping {}, view query filters", key);
 +            return;
 +        }
 +
 +        int nowInSec = FBUtilities.nowInSeconds();
 +        SinglePartitionReadCommand command = 
view.getSelectStatement().internalReadForView(key, nowInSec);
 +
 +        // We're rebuilding everything from what's on disk, so we read 
everything, consider that as new updates
 +        // and pretend that there is nothing pre-existing.
 +        UnfilteredRowIterator empty = 
UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, 
Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
 +
 +        try (ReadExecutionController orderGroup = 
command.executionController();
 +             UnfilteredRowIterator data = 
UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), 
command))
 +        {
 +            Iterator<Collection<Mutation>> mutations = 
baseCfs.keyspace.viewManager
 +                                                       
.forTable(baseCfs.metadata.id)
 +                                                       
.generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
 +
 +            AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
 +            mutations.forEachRemaining(m -> 
StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime()));
 +        }
 +    }
 +
 +    public Long call()
 +    {
 +        String ksName = baseCfs.metadata.keyspace;
 +
 +        if (prevToken == null)
 +            logger.debug("Starting new view build for range {}", range);
 +        else
 +            logger.debug("Resuming view build for range {} from token {} with 
{} covered keys", range, prevToken, keysBuilt);
 +
 +        /*
 +         * It's possible for view building to start before MV creation got 
propagated to other nodes. For this reason
 +         * we should wait for schema to converge before attempting to send 
any view mutations to other nodes, or else
 +         * face UnknownTableException upon Mutation deserialization on the 
nodes that haven't processed the schema change.
 +         */
 +        boolean schemaConverged = 
Gossiper.instance.waitForSchemaAgreement(10, TimeUnit.SECONDS, () -> 
this.isStopped);
 +        if (!schemaConverged)
 +            logger.warn("Failed to get schema to converge before building 
view {}.{}", baseCfs.keyspace.getName(), view.name);
 +
 +        Function<org.apache.cassandra.db.lifecycle.View, 
Iterable<SSTableReader>> function;
 +        function = 
org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> 
range.intersects(s.getBounds()));
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = 
baseCfs.selectAndReference(function);
 +             Refs<SSTableReader> sstables = viewFragment.refs;
 +             ReducingKeyIterator keyIter = new ReducingKeyIterator(sstables))
 +        {
 +            PeekingIterator<DecoratedKey> iter = 
Iterators.peekingIterator(keyIter);
 +            while (!isStopped && iter.hasNext())
 +            {
 +                DecoratedKey key = iter.next();
 +                Token token = key.getToken();
 +                //skip tokens already built or not present in range
 +                if (range.contains(token) && (prevToken == null || 
token.compareTo(prevToken) > 0))
 +                {
 +                    buildKey(key);
 +                    ++keysBuilt;
 +                    //build other keys sharing the same token
 +                    while (iter.hasNext() && 
iter.peek().getToken().equals(token))
 +                    {
 +                        key = iter.next();
 +                        buildKey(key);
 +                        ++keysBuilt;
 +                    }
 +                    if (keysBuilt % ROWS_BETWEEN_CHECKPOINTS == 1)
 +                        SystemKeyspace.updateViewBuildStatus(ksName, 
view.name, range, token, keysBuilt);
 +                    prevToken = token;
 +                }
 +            }
 +        }
 +
 +        finish();
 +
 +        return keysBuilt;
 +    }
 +
 +    private void finish()
 +    {
 +        String ksName = baseCfs.keyspace.getName();
 +        if (!isStopped)
 +        {
 +            // Save the completed status using the end of the range as last 
token. This way it will be possible for
 +            // future view build attempts to don't even create a task for 
this range
 +            SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, 
range.right, keysBuilt);
 +
 +            logger.debug("Completed build of view({}.{}) for range {} after 
covering {} keys ", ksName, view.name, range, keysBuilt);
 +        }
 +        else
 +        {
 +            logger.debug("Stopped build for view({}.{}) for range {} after 
covering {} keys", ksName, view.name, range, keysBuilt);
 +
 +            // If it's stopped due to a compaction interruption we should 
throw that exception.
 +            // Otherwise we assume that the task has been stopped due to a 
schema update and we can finish successfully.
 +            if (isCompactionInterrupted)
 +                throw new StoppedException(ksName, view.name, 
getCompactionInfo());
 +        }
 +    }
 +
 +    @Override
 +    public CompactionInfo getCompactionInfo()
 +    {
 +        // we don't know the sstables at construction of ViewBuilderTask and 
we could change this to return once we know the
 +        // but since we basically only cancel view builds on truncation where 
we cancel all compactions anyway, this seems reasonable
 +
 +        // If there's splitter, calculate progress based on last token 
position
 +        if (range.left.getPartitioner().splitter().isPresent())
 +        {
 +            long progress = prevToken == null ? 0 : 
Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken,
 range) * 1000);
 +            return CompactionInfo.withoutSSTables(baseCfs.metadata(), 
OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
 +        }
 +
 +        // When there is no splitter, estimate based on number of total keys 
but
 +        // take the max with keysBuilt + 1 to avoid having more completed 
than total
 +        long keysTotal = Math.max(keysBuilt + 1, 
baseCfs.estimatedKeysForRange(range));
 +        return CompactionInfo.withoutSSTables(baseCfs.metadata(), 
OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
 +    }
 +
 +    @Override
 +    public void stop()
 +    {
 +        stop(true);
 +    }
 +
++    public boolean isGlobal()
++    {
++        return false;
++    }
++
 +    synchronized void stop(boolean isCompactionInterrupted)
 +    {
 +        isStopped = true;
 +        this.isCompactionInterrupted = isCompactionInterrupted;
 +    }
 +
 +    long keysBuilt()
 +    {
 +        return keysBuilt;
 +    }
 +
 +    /**
 +     * {@link CompactionInterruptedException} with {@link 
Object#equals(Object)} and {@link Object#hashCode()}
 +     * implementations that consider equals all the exceptions produced by 
the same view build, independently of their
 +     * token range.
 +     * <p>
 +     * This is used to avoid Guava's {@link Futures#allAsList(Iterable)} log 
spamming when multiple build tasks fail
 +     * due to compaction interruption.
 +     */
 +    static class StoppedException extends CompactionInterruptedException
 +    {
 +        private final String ksName, viewName;
 +
 +        private StoppedException(String ksName, String viewName, 
CompactionInfo info)
 +        {
 +            super(info);
 +            this.ksName = ksName;
 +            this.viewName = viewName;
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if (!(o instanceof StoppedException))
 +                return false;
 +
 +            StoppedException that = (StoppedException) o;
 +            return Objects.equal(this.ksName, that.ksName) && 
Objects.equal(this.viewName, that.viewName);
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            return 31 * ksName.hashCode() + viewName.hashCode();
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index 9ec8a4e,8276626..73dc334
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@@ -25,4 -25,8 +25,9 @@@ import org.apache.cassandra.db.compacti
  public abstract class SecondaryIndexBuilder extends CompactionInfo.Holder
  {
      public abstract void build();
++
+     public boolean isGlobal()
+     {
+         return false;
+     }
  }
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 1f4059a,dea1cd6..880b738
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -228,13 -224,13 +228,15 @@@ public class IndexSummaryManager implem
  
      public void redistributeSummaries() throws IOException
      {
+         if (CompactionManager.instance.isGlobalCompactionPaused())
+             return;
 -        Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> 
compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
 +        Pair<Long, Map<TableId, LifecycleTransaction>> 
redistributionTransactionInfo = getRestributionTransactions();
 +        Map<TableId, LifecycleTransaction> transactions = 
redistributionTransactionInfo.right;
 +        long nonRedistributingOffHeapSize = 
redistributionTransactionInfo.left;
          try
          {
 -            redistributeSummaries(new 
IndexSummaryRedistribution(compactingAndNonCompacting.left,
 -                                                                 
compactingAndNonCompacting.right,
 +            redistributeSummaries(new IndexSummaryRedistribution(transactions,
 +                                                                 
nonRedistributingOffHeapSize,
                                                                   
this.memoryPoolBytes));
          }
          catch (Exception e)
diff --cc 
src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index a8fcad1,d80adc0..1300c99
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -305,9 -312,14 +306,14 @@@ public class IndexSummaryRedistributio
  
      public CompactionInfo getCompactionInfo()
      {
 -        return new CompactionInfo(OperationType.INDEX_SUMMARY, 
(memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
 +        return CompactionInfo.withoutSSTables(null, 
OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), 
memoryPoolBytes, Unit.BYTES, compactionId);
      }
  
+     public boolean isGlobal()
+     {
+         return true;
+     }
+ 
      /** Utility class for sorting sstables by their read rates. */
      private static class ReadRateComparator implements 
Comparator<SSTableReader>
      {
diff --cc 
test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
index 976180c,bcbe92d..60433bb
--- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@@ -18,436 -18,88 +18,506 @@@
  
  package org.apache.cassandra.db.compaction;
  
 +import java.util.ArrayList;
  import java.util.Collections;
 +import java.util.HashSet;
  import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
  import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
  
 +import com.google.common.collect.ImmutableSet;
- import org.junit.BeforeClass;
+ import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.Test;
  
 -import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.CQLTester;
 -import org.apache.cassandra.metrics.CompactionMetrics;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.repair.PendingAntiCompaction;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.StubIndex;
 +import org.apache.cassandra.index.internal.CollatedViewIndexBuilder;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.RangesAtEndpoint;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.schema.MockSchema;
++import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.FBUtilities;
  
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
  
  public class CancelCompactionsTest extends CQLTester
  {
 +    /**
 +     * makes sure we only cancel compactions if the precidate says we have 
overlapping sstables
 +     */
 +    @Test
 +    public void cancelTest() throws InterruptedException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
 +        Set<SSTableReader> toMarkCompacting = new 
HashSet<>(sstables.subList(0, 3));
 +
 +        TestCompactionTask tct = new TestCompactionTask(cfs, 
toMarkCompacting);
 +        try
 +        {
 +            tct.start();
 +
 +            List<CompactionInfo.Holder> activeCompactions = 
CompactionManager.instance.active.getCompactions();
 +            assertEquals(1, activeCompactions.size());
 +            
assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), 
toMarkCompacting);
 +            // predicate requires the non-compacting sstables, should not 
cancel the one currently compacting:
 +            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> 
!toMarkCompacting.contains(sstable), false, false, true);
 +            assertEquals(1, activeCompactions.size());
 +            assertFalse(activeCompactions.get(0).isStopRequested());
 +
 +            // predicate requires the compacting ones - make sure stop is 
requested and that when we abort that
 +            // compaction we actually run the callable (countdown the latch)
 +            CountDownLatch cdl = new CountDownLatch(1);
 +            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> 
{ cdl.countDown(); return null; }, toMarkCompacting::contains, false, false, 
true));
 +            t.start();
 +            while (!activeCompactions.get(0).isStopRequested())
 +                Thread.sleep(100);
 +
 +            // cdl.countDown will not get executed until we have aborted all 
compactions for the sstables in toMarkCompacting
 +            assertFalse(cdl.await(2, TimeUnit.SECONDS));
 +            tct.abort();
 +            // now the compactions are aborted and we can successfully wait 
for the latch
 +            t.join();
 +            assertTrue(cdl.await(2, TimeUnit.SECONDS));
 +        }
 +        finally
 +        {
 +            tct.abort();
 +        }
 +    }
 +
 +    /**
 +     * make sure we only cancel relevant compactions when there are multiple 
ongoing compactions
 +     */
 +    @Test
 +    public void multipleCompactionsCancelTest() throws InterruptedException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
 +
 +        List<TestCompactionTask> tcts = new ArrayList<>();
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(0, 3))));
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(6, 9))));
 +
 +        try
 +        {
 +            tcts.forEach(TestCompactionTask::start);
 +
 +            List<CompactionInfo.Holder> activeCompactions = 
CompactionManager.instance.active.getCompactions();
 +            assertEquals(2, activeCompactions.size());
 +
 +            Set<Set<SSTableReader>> compactingSSTables = new HashSet<>();
 +            
compactingSSTables.add(activeCompactions.get(0).getCompactionInfo().getSSTables());
 +            
compactingSSTables.add(activeCompactions.get(1).getCompactionInfo().getSSTables());
 +            Set<Set<SSTableReader>> expectedSSTables = new HashSet<>();
 +            expectedSSTables.add(new HashSet<>(sstables.subList(0, 3)));
 +            expectedSSTables.add(new HashSet<>(sstables.subList(6, 9)));
 +            assertEquals(compactingSSTables, expectedSSTables);
 +
 +            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, 
false, false, true);
 +            assertEquals(2, activeCompactions.size());
 +            
assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested));
 +
 +            CountDownLatch cdl = new CountDownLatch(1);
 +            // start a compaction which only needs the sstables where first 
token is > 50 - these are the sstables compacted by tcts.get(1)
 +            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> 
{ cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, 
false, true));
 +            t.start();
 +            activeCompactions = 
CompactionManager.instance.active.getCompactions();
 +            assertEquals(2, activeCompactions.size());
 +            Thread.sleep(500);
 +            for (CompactionInfo.Holder holder : activeCompactions)
 +            {
 +                if 
(holder.getCompactionInfo().getSSTables().containsAll(sstables.subList(6, 9)))
 +                    assertTrue(holder.isStopRequested());
 +                else
 +                    assertFalse(holder.isStopRequested());
 +            }
 +            tcts.get(1).abort();
 +            assertEquals(1, 
CompactionManager.instance.active.getCompactions().size());
 +            cdl.await();
 +            t.join();
 +        }
 +        finally
 +        {
 +            tcts.forEach(TestCompactionTask::abort);
 +        }
 +    }
 +
 +    /**
 +     * Makes sure sub range compaction now only cancels the relevant 
compactions, not all of them
 +     */
 +    @Test
 +    public void testSubrangeCompaction() throws InterruptedException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
 +
 +        List<TestCompactionTask> tcts = new ArrayList<>();
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(0, 2))));
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(3, 4))));
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(5, 7))));
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(8, 9))));
 +        try
 +        {
 +            tcts.forEach(TestCompactionTask::start);
 +
 +            List<CompactionInfo.Holder> activeCompactions = 
CompactionManager.instance.active.getCompactions();
 +            assertEquals(4, activeCompactions.size());
 +            Range<Token> range = new Range<>(token(0), token(49));
 +            Thread t = new Thread(() -> {
 +                try
 +                {
 +                    
cfs.forceCompactionForTokenRange(Collections.singleton(range));
 +                }
 +                catch (Throwable e)
 +                {
 +                    throw new RuntimeException(e);
 +                }
 +            });
 +
 +            t.start();
 +
 +            Thread.sleep(500);
 +            assertEquals(4, 
CompactionManager.instance.active.getCompactions().size());
 +            List<TestCompactionTask> toAbort = new ArrayList<>();
 +            for (CompactionInfo.Holder holder : 
CompactionManager.instance.active.getCompactions())
 +            {
 +                if 
(holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> 
sstable.intersects(Collections.singleton(range))))
 +                {
 +                    assertTrue(holder.isStopRequested());
 +                    for (TestCompactionTask tct : tcts)
 +                        if 
(tct.sstables.equals(holder.getCompactionInfo().getSSTables()))
 +                            toAbort.add(tct);
 +                }
 +                else
 +                    assertFalse(holder.isStopRequested());
 +            }
 +            assertEquals(2, toAbort.size());
 +            toAbort.forEach(TestCompactionTask::abort);
 +            t.join();
 +
 +        }
 +        finally
 +        {
 +            tcts.forEach(TestCompactionTask::abort);
 +        }
 +    }
 +
 +    @Test
 +    public void testAnticompaction() throws InterruptedException, 
ExecutionException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
 +        List<SSTableReader> alreadyRepairedSSTables = createSSTables(cfs, 10, 
10);
 +        for (SSTableReader sstable : alreadyRepairedSSTables)
 +            AbstractPendingRepairTest.mutateRepaired(sstable, 
System.currentTimeMillis());
 +        assertEquals(20, cfs.getLiveSSTables().size());
 +        List<TestCompactionTask> tcts = new ArrayList<>();
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(0, 2))));
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(3, 4))));
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(5, 7))));
 +        tcts.add(new TestCompactionTask(cfs, new 
HashSet<>(sstables.subList(8, 9))));
 +
 +        List<TestCompactionTask> nonAffectedTcts = new ArrayList<>();
 +        nonAffectedTcts.add(new TestCompactionTask(cfs, new 
HashSet<>(alreadyRepairedSSTables)));
 +
 +        try
 +        {
 +            tcts.forEach(TestCompactionTask::start);
 +            nonAffectedTcts.forEach(TestCompactionTask::start);
 +            List<CompactionInfo.Holder> activeCompactions = 
CompactionManager.instance.active.getCompactions();
 +            assertEquals(5, activeCompactions.size());
 +            // make sure that sstables are fully contained so that the 
metadata gets mutated
 +            Range<Token> range = new Range<>(token(-1), token(49));
 +
 +            UUID prsid = UUID.randomUUID();
 +            ActiveRepairService.instance.registerParentRepairSession(prsid, 
InetAddressAndPort.getLocalHost(), Collections.singletonList(cfs), 
Collections.singleton(range), true, 1, true, PreviewKind.NONE);
 +
 +            InetAddressAndPort local = 
FBUtilities.getBroadcastAddressAndPort();
 +            RangesAtEndpoint rae = RangesAtEndpoint.builder(local).add(new 
Replica(local, range, true)).build();
 +
 +            PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), rae, Executors.newSingleThreadExecutor(), () -> 
false);
 +            Future<?> fut = pac.run();
 +            Thread.sleep(600);
 +            List<TestCompactionTask> toAbort = new ArrayList<>();
 +            for (CompactionInfo.Holder holder : 
CompactionManager.instance.active.getCompactions())
 +            {
 +                if 
(holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> 
sstable.intersects(Collections.singleton(range)) && !sstable.isRepaired() && 
!sstable.isPendingRepair()))
 +                {
 +                    assertTrue(holder.isStopRequested());
 +                    for (TestCompactionTask tct : tcts)
 +                        if 
(tct.sstables.equals(holder.getCompactionInfo().getSSTables()))
 +                            toAbort.add(tct);
 +                }
 +                else
 +                    assertFalse(holder.isStopRequested());
 +            }
 +            assertEquals(2, toAbort.size());
 +            toAbort.forEach(TestCompactionTask::abort);
 +            fut.get();
 +            for (SSTableReader sstable : sstables)
 +                assertTrue(!sstable.intersects(Collections.singleton(range)) 
|| sstable.isPendingRepair());
 +        }
 +        finally
 +        {
 +            tcts.forEach(TestCompactionTask::abort);
 +            nonAffectedTcts.forEach(TestCompactionTask::abort);
 +        }
 +    }
 +
 +    /**
 +     * Make sure index rebuilds get cancelled
 +     */
 +    @Test
 +    public void testIndexRebuild() throws ExecutionException, 
InterruptedException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        List<SSTableReader> sstables = createSSTables(cfs, 5, 0);
 +        Index idx = new StubIndex(cfs, null);
 +        CountDownLatch indexBuildStarted = new CountDownLatch(1);
 +        CountDownLatch indexBuildRunning = new CountDownLatch(1);
 +        CountDownLatch compactionsStopped = new CountDownLatch(1);
 +        ReducingKeyIterator reducingKeyIterator = new 
ReducingKeyIterator(sstables)
 +        {
 +            @Override
 +            public boolean hasNext()
 +            {
 +                indexBuildStarted.countDown();
 +                try
 +                {
 +                    indexBuildRunning.await();
 +                }
 +                catch (InterruptedException e)
 +                {
 +                    throw new RuntimeException();
 +                }
 +                return false;
 +            }
 +        };
 +        Future<?> f = CompactionManager.instance.submitIndexBuild(new 
CollatedViewIndexBuilder(cfs, Collections.singleton(idx), reducingKeyIterator, 
ImmutableSet.copyOf(sstables)));
 +        // wait for hasNext to get called
 +        indexBuildStarted.await();
 +        assertEquals(1, 
CompactionManager.instance.active.getCompactions().size());
 +        boolean foundCompaction = false;
 +        for (CompactionInfo.Holder holder : 
CompactionManager.instance.active.getCompactions())
 +        {
 +            if (holder.getCompactionInfo().getSSTables().equals(new 
HashSet<>(sstables)))
 +            {
 +                assertFalse(holder.isStopRequested());
 +                foundCompaction = true;
 +            }
 +        }
 +        assertTrue(foundCompaction);
 +        cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); 
return null;}, (sstable) -> true, false, false, true);
 +        // wait for the runWithCompactionsDisabled callable
 +        compactionsStopped.await();
 +        assertEquals(1, 
CompactionManager.instance.active.getCompactions().size());
 +        foundCompaction = false;
 +        for (CompactionInfo.Holder holder : 
CompactionManager.instance.active.getCompactions())
 +        {
 +            if (holder.getCompactionInfo().getSSTables().equals(new 
HashSet<>(sstables)))
 +            {
 +                assertTrue(holder.isStopRequested());
 +                foundCompaction = true;
 +            }
 +        }
 +        assertTrue(foundCompaction);
 +        // signal that the index build should be finished
 +        indexBuildRunning.countDown();
 +        f.get();
 +        
assertTrue(CompactionManager.instance.active.getCompactions().isEmpty());
 +    }
 +
 +    long first(SSTableReader sstable)
 +    {
 +        return (long)sstable.first.getToken().getTokenValue();
 +    }
 +
 +    Token token(long t)
 +    {
 +        return new Murmur3Partitioner.LongToken(t);
 +    }
 +
 +    private List<SSTableReader> createSSTables(ColumnFamilyStore cfs, int 
count, int startGeneration)
 +    {
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        for (int i = 0; i < count; i++)
 +        {
 +            long first = i * 10;
 +            long last  = (i + 1) * 10 - 1;
 +            sstables.add(MockSchema.sstable(startGeneration + i, 0, true, 
first, last, cfs));
 +        }
 +        cfs.disableAutoCompaction();
 +        cfs.addSSTables(sstables);
 +        return sstables;
 +    }
 +
 +    private static class TestCompactionTask
 +    {
 +        private ColumnFamilyStore cfs;
 +        private final Set<SSTableReader> sstables;
 +        private LifecycleTransaction txn;
 +        private CompactionController controller;
 +        private CompactionIterator ci;
 +        private List<ISSTableScanner> scanners;
 +
 +        public TestCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> 
sstables)
 +        {
 +            this.cfs = cfs;
 +            this.sstables = sstables;
 +        }
 +
 +        public void start()
 +        {
 +            scanners = 
sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
 +            txn = cfs.getTracker().tryModify(sstables, 
OperationType.COMPACTION);
 +            assertNotNull(txn);
 +            controller = new CompactionController(cfs, sstables, 
Integer.MIN_VALUE);
 +            ci = new CompactionIterator(txn.opType(), scanners, controller, 
FBUtilities.nowInSeconds(), UUID.randomUUID());
 +            CompactionManager.instance.active.beginCompaction(ci);
 +        }
 +
 +        public void abort()
 +        {
 +            if (controller != null)
 +                controller.close();
 +            if (ci != null)
 +                ci.close();
 +            if (txn != null)
 +                txn.abort();
 +            if (scanners != null)
 +                scanners.forEach(ISSTableScanner::close);
 +            CompactionManager.instance.active.finishCompaction(ci);
 +
 +        }
 +    }
 +
 +    @Test
 +    public void test2iCancellation() throws Throwable
 +    {
 +        createTable("create table %s (id int primary key, something int)");
 +        createIndex("create index on %s(something)");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 10; i++)
 +            execute("insert into %s (id, something) values (?, ?)", i, i);
 +        flush();
 +        ColumnFamilyStore idx = 
getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next();
 +        Set<SSTableReader> sstables = new HashSet<>();
 +        try (LifecycleTransaction txn = 
idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
 +        {
 +            getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> 
true, (sstable) -> { sstables.add(sstable); return true;}, false, false, false);
 +        }
 +        // the predicate only gets compacting sstables, and we are only 
compacting the 2i sstables - with interruptIndexes = false we should see no 
sstables here
 +        assertTrue(sstables.isEmpty());
 +    }
 +
 +    @Test
 +    public void testSubrangeCompactionWith2i() throws Throwable
 +    {
 +        createTable("create table %s (id int primary key, something int)");
 +        createIndex("create index on %s(something)");
 +        getCurrentColumnFamilyStore().disableAutoCompaction();
 +        for (int i = 0; i < 10; i++)
 +            execute("insert into %s (id, something) values (?, ?)", i, i);
 +        flush();
 +        ColumnFamilyStore idx = 
getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next();
 +        try (LifecycleTransaction txn = 
idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
 +        {
 +            IPartitioner partitioner = 
getCurrentColumnFamilyStore().getPartitioner();
 +            
getCurrentColumnFamilyStore().forceCompactionForTokenRange(Collections.singleton(new
 Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken())));
 +        }
 +    }
++
+     @Test
+     public void testStandardCompactionTaskCancellation() throws Throwable
+     {
+         createTable("create table %s (id int primary key, something int)");
+         getCurrentColumnFamilyStore().disableAutoCompaction();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             execute("insert into %s (id, something) values (?,?)", i, i);
+             getCurrentColumnFamilyStore().forceBlockingFlush();
+         }
+         AbstractCompactionTask ct = null;
+ 
+         for (List<AbstractCompactionStrategy> css : 
getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies())
+         {
+             for (AbstractCompactionStrategy cs : css)
+             {
+                 ct = cs.getNextBackgroundTask(0);
+                 if (ct != null)
+                     break;
+             }
 -            if (ct != null)
 -                break;
++            if (ct != null) break;
+         }
+         assertNotNull(ct);
+ 
+         CountDownLatch waitForBeginCompaction = new CountDownLatch(1);
+         CountDownLatch waitForStart = new CountDownLatch(1);
 -        Iterable<CFMetaData> metadatas = 
Collections.singleton(getCurrentColumnFamilyStore().metadata);
++        Iterable<TableMetadata> metadatas = 
Collections.singleton(getCurrentColumnFamilyStore().metadata());
+         /*
+         Here we ask strategies to pause & interrupt compactions right before 
calling beginCompaction in CompactionTask
+         The code running in the separate thread below mimics 
CFS#runWithCompactionsDisabled but we only allow
+         the real beginCompaction to be called after pausing & interrupting.
+          */
+         Thread t = new Thread(() -> {
+             Uninterruptibles.awaitUninterruptibly(waitForBeginCompaction);
+             
getCurrentColumnFamilyStore().getCompactionStrategyManager().pause();
 -            CompactionManager.instance.interruptCompactionFor(metadatas, 
false);
++            CompactionManager.instance.interruptCompactionFor(metadatas, (s) 
-> true, false);
+             waitForStart.countDown();
 -            
CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore()));
++            
CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore()),
 (s) -> true);
+             
getCurrentColumnFamilyStore().getCompactionStrategyManager().resume();
+         });
+         t.start();
+ 
+         try
+         {
 -            ct.execute(new CompactionMetrics()
++            ct.execute(new ActiveCompactions()
+             {
+                 @Override
+                 public void beginCompaction(CompactionInfo.Holder ci)
+                 {
+                     waitForBeginCompaction.countDown();
+                     Uninterruptibles.awaitUninterruptibly(waitForStart);
+                     super.beginCompaction(ci);
+                 }
+             });
+             fail("execute should throw CompactionInterruptedException");
+         }
+         catch (CompactionInterruptedException cie)
+         {
+             // expected
+         }
+         finally
+         {
+             ct.transaction.abort();
+             t.join();
+         }
+     }
  }
diff --cc 
test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 6bef001,0000000..1c5c245
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@@ -1,747 -1,0 +1,762 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.repair;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.stream.Collectors;
 +
 +import javax.annotation.Nullable;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.FutureCallback;
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.ListenableFuture;
 +import com.google.common.util.concurrent.ListenableFutureTask;
 +import com.google.common.util.concurrent.ListeningExecutorService;
 +import com.google.common.util.concurrent.MoreExecutors;
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.compaction.AbstractPendingRepairTest;
 +import org.apache.cassandra.db.compaction.CompactionController;
 +import org.apache.cassandra.db.compaction.CompactionInfo;
 +import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 +import org.apache.cassandra.db.compaction.CompactionIterator;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.RangesAtEndpoint;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
 +import org.apache.cassandra.schema.MockSchema;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +import org.apache.cassandra.utils.concurrent.Transactional;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
 +{
 +    static final Logger logger = 
LoggerFactory.getLogger(PendingAntiCompactionTest.class);
 +
 +    private static class InstrumentedAcquisitionCallback extends 
PendingAntiCompaction.AcquisitionCallback
 +    {
 +        public InstrumentedAcquisitionCallback(UUID parentRepairSession, 
RangesAtEndpoint ranges)
 +        {
 +            super(parentRepairSession, ranges, () -> false);
 +        }
 +
 +        Set<TableId> submittedCompactions = new HashSet<>();
 +
 +        ListenableFuture<?> 
submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result)
 +        {
 +            submittedCompactions.add(result.cfs.metadata.id);
 +            result.abort();  // prevent ref leak complaints
 +            return ListenableFutureTask.create(() -> {}, null);
 +        }
 +    }
 +
 +    /**
 +     * verify the pending anti compaction happy path
 +     */
 +    @Test
 +    public void successCase() throws Exception
 +    {
 +        Assert.assertSame(ByteOrderedPartitioner.class, 
DatabaseDescriptor.getPartitioner().getClass());
 +        cfs.disableAutoCompaction();
 +
 +        // create 2 sstables, one that will be split, and another that will 
be moved
 +        for (int i = 0; i < 8; i++)
 +        {
 +            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), i, i);
 +        }
 +        cfs.forceBlockingFlush();
 +        for (int i = 8; i < 12; i++)
 +        {
 +            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), i, i);
 +        }
 +        cfs.forceBlockingFlush();
 +        assertEquals(2, cfs.getLiveSSTables().size());
 +
 +        Token left = 
ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6));
 +        Token right = 
ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16));
 +        List<ColumnFamilyStore> tables = Lists.newArrayList(cfs);
 +        Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(left, right));
 +
 +        // create a session so the anti compaction can fine it
 +        UUID sessionID = UUIDGen.getTimeUUID();
 +        ActiveRepairService.instance.registerParentRepairSession(sessionID, 
InetAddressAndPort.getLocalHost(), tables, ranges, true, 1, true, 
PreviewKind.NONE);
 +
 +        PendingAntiCompaction pac;
 +        ExecutorService executor = Executors.newSingleThreadExecutor();
 +        try
 +        {
 +            pac = new PendingAntiCompaction(sessionID, tables, 
atEndpoint(ranges, NO_RANGES), executor, () -> false);
 +            pac.run().get();
 +        }
 +        finally
 +        {
 +            executor.shutdown();
 +        }
 +
 +        assertEquals(3, cfs.getLiveSSTables().size());
 +        int pendingRepair = 0;
 +        for (SSTableReader sstable : cfs.getLiveSSTables())
 +        {
 +            if (sstable.isPendingRepair())
 +                pendingRepair++;
 +        }
 +        assertEquals(2, pendingRepair);
 +    }
 +
 +    @Test
 +    public void acquisitionSuccess() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(6);
 +        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
 +        List<SSTableReader> expected = sstables.subList(0, 3);
 +        Collection<Range<Token>> ranges = new HashSet<>();
 +        for (SSTableReader sstable : expected)
 +        {
 +            ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
 +        }
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID(), 
0, 0);
 +
 +        logger.info("SSTables: {}", sstables);
 +        logger.info("Expected: {}", expected);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        assertNotNull(result);
 +        logger.info("Originals: {}", result.txn.originals());
 +        assertEquals(3, result.txn.originals().size());
 +        for (SSTableReader sstable : expected)
 +        {
 +            logger.info("Checking {}", sstable);
 +            assertTrue(result.txn.originals().contains(sstable));
 +        }
 +
 +        assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, 
result.txn.state());
 +        result.abort();
 +    }
 +
 +    @Test
 +    public void repairedSSTablesAreNotAcquired() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
 +        assertEquals(2, sstables.size());
 +        SSTableReader repaired = sstables.get(0);
 +        SSTableReader unrepaired = sstables.get(1);
 +        assertTrue(repaired.intersects(FULL_RANGE));
 +        assertTrue(unrepaired.intersects(FULL_RANGE));
 +
 +        
repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor,
 1, null, false);
 +        repaired.reloadSSTableMetadata();
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        logger.info("Originals: {}", result.txn.originals());
 +        assertEquals(1, result.txn.originals().size());
 +        assertTrue(result.txn.originals().contains(unrepaired));
 +        result.abort(); // release sstable refs
 +    }
 +
 +    @Test
 +    public void finalizedPendingRepairSSTablesAreNotAcquired() throws 
Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
 +        assertEquals(2, sstables.size());
 +        SSTableReader repaired = sstables.get(0);
 +        SSTableReader unrepaired = sstables.get(1);
 +        assertTrue(repaired.intersects(FULL_RANGE));
 +        assertTrue(unrepaired.intersects(FULL_RANGE));
 +
 +        UUID sessionId = prepareSession();
 +        LocalSessionAccessor.finalizeUnsafe(sessionId);
 +        
repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor,
 0, sessionId, false);
 +        repaired.reloadSSTableMetadata();
 +        assertTrue(repaired.isPendingRepair());
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        logger.info("Originals: {}", result.txn.originals());
 +        assertEquals(1, result.txn.originals().size());
 +        assertTrue(result.txn.originals().contains(unrepaired));
 +        result.abort();  // releases sstable refs
 +    }
 +
 +    @Test
 +    public void conflictingSessionAcquisitionFailure() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
 +        assertEquals(2, sstables.size());
 +        SSTableReader repaired = sstables.get(0);
 +        SSTableReader unrepaired = sstables.get(1);
 +        assertTrue(repaired.intersects(FULL_RANGE));
 +        assertTrue(unrepaired.intersects(FULL_RANGE));
 +
 +        UUID sessionId = prepareSession();
 +        
repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor,
 0, sessionId, false);
 +        repaired.reloadSSTableMetadata();
 +        assertTrue(repaired.isPendingRepair());
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        assertNull(result);
 +    }
 +
 +    @Test
 +    public void pendingRepairNoSSTablesExist() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +
 +        assertEquals(0, cfs.getLiveSSTables().size());
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        result.abort();  // There's nothing to release, but we should exit 
cleanly
 +    }
 +
 +    /**
 +     * anti compaction task should be submitted if everything is ok
 +     */
 +    @Test
 +    public void callbackSuccess() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, 
NO_RANGES));
 +        assertTrue(cb.submittedCompactions.isEmpty());
 +        cb.apply(Lists.newArrayList(result));
 +
 +        assertEquals(1, cb.submittedCompactions.size());
 +        assertTrue(cb.submittedCompactions.contains(cfm.id));
 +    }
 +
 +    /**
 +     * If one of the supplied AcquireResults is null, either an Exception was 
thrown, or
 +     * we couldn't get a transaction for the sstables. In either case we need 
to cancel the repair, and release
 +     * any sstables acquired for other tables
 +     */
 +    @Test
 +    public void callbackNullResult() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        assertNotNull(result);
 +        assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, 
result.txn.state());
 +
 +        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, 
Collections.emptyList()));
 +        assertTrue(cb.submittedCompactions.isEmpty());
 +        cb.apply(Lists.newArrayList(result, null));
 +
 +        assertTrue(cb.submittedCompactions.isEmpty());
 +        assertEquals(Transactional.AbstractTransactional.State.ABORTED, 
result.txn.state());
 +    }
 +
 +    /**
 +     * If an AcquireResult has a null txn, there were no sstables to acquire 
references
 +     * for, so no anti compaction should have been submitted.
 +     */
 +    @Test
 +    public void callbackNullTxn() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        assertNotNull(result);
 +
 +        ColumnFamilyStore cfs2 = 
Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system",
 "peers").id);
 +        PendingAntiCompaction.AcquireResult fakeResult = new 
PendingAntiCompaction.AcquireResult(cfs2, null, null);
 +
 +        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, 
NO_RANGES));
 +        assertTrue(cb.submittedCompactions.isEmpty());
 +        cb.apply(Lists.newArrayList(result, fakeResult));
 +
 +        assertEquals(1, cb.submittedCompactions.size());
 +        assertTrue(cb.submittedCompactions.contains(cfm.id));
 +        assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
 +    }
 +
 +
 +    @Test
 +    public void singleAnticompaction() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        UUID sessionID = UUIDGen.getTimeUUID();
 +        ActiveRepairService.instance.registerParentRepairSession(sessionID,
 +                                                                 
InetAddressAndPort.getByName("127.0.0.1"),
 +                                                                 
Lists.newArrayList(cfs),
 +                                                                 FULL_RANGE,
 +                                                                 true,0,
 +                                                                 true,
 +                                                                 
PreviewKind.NONE);
 +        CompactionManager.instance.performAnticompaction(result.cfs, 
atEndpoint(FULL_RANGE, NO_RANGES), result.refs, result.txn, sessionID, () -> 
false);
 +
 +    }
 +
 +    @Test (expected = CompactionInterruptedException.class)
 +    public void cancelledAntiCompaction() throws Exception
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(1);
 +
 +        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
 +        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
 +        UUID sessionID = UUIDGen.getTimeUUID();
 +        ActiveRepairService.instance.registerParentRepairSession(sessionID,
 +                                                                 
InetAddressAndPort.getByName("127.0.0.1"),
 +                                                                 
Lists.newArrayList(cfs),
 +                                                                 FULL_RANGE,
 +                                                                 true,0,
 +                                                                 true,
 +                                                                 
PreviewKind.NONE);
 +
 +        // attempt to anti-compact the sstable in half
 +        SSTableReader sstable = 
Iterables.getOnlyElement(cfs.getLiveSSTables());
 +        Token left = cfs.getPartitioner().midpoint(sstable.first.getToken(), 
sstable.last.getToken());
 +        Token right = sstable.last.getToken();
 +        CompactionManager.instance.performAnticompaction(result.cfs,
 +                                                         
atEndpoint(Collections.singleton(new Range<>(left, right)), NO_RANGES),
 +                                                         result.refs, 
result.txn, sessionID, () -> true);
 +    }
 +
 +    /**
 +     * Makes sure that PendingAntiCompaction fails when anticompaction throws 
exception
 +     */
 +    @Test
 +    public void antiCompactionException()
 +    {
 +        cfs.disableAutoCompaction();
 +        makeSSTables(2);
 +        UUID prsid = UUID.randomUUID();
 +        ListeningExecutorService es = 
MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
 +        PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false) 
{
 +            @Override
 +            protected AcquisitionCallback getAcquisitionCallback(UUID prsId, 
RangesAtEndpoint tokenRanges)
 +            {
 +                return new AcquisitionCallback(prsid, tokenRanges, () -> 
false)
 +                {
 +                    @Override
 +                    ListenableFuture<?> 
submitPendingAntiCompaction(AcquireResult result)
 +                    {
 +                        Runnable r = new WrappedRunnable()
 +                        {
 +                            protected void runMayThrow()
 +                            {
 +                                throw new 
CompactionInterruptedException(null);
 +                            }
 +                        };
 +                        return es.submit(r);
 +                    }
 +                };
 +            }
 +        };
 +        ListenableFuture<?> fut = pac.run();
 +        try
 +        {
 +            fut.get();
 +            fail("Should throw exception");
 +        }
 +        catch(Throwable t)
 +        {
 +        }
 +    }
 +
 +    @Test
 +    public void testBlockedAcquisition() throws ExecutionException, 
InterruptedException, TimeoutException
 +    {
 +        cfs.disableAutoCompaction();
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +
 +        makeSSTables(2);
 +        UUID prsid = UUID.randomUUID();
 +        Set<SSTableReader> sstables = cfs.getLiveSSTables();
 +        List<ISSTableScanner> scanners = 
sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
 +        try
 +        {
 +            try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
 +                 CompactionController controller = new 
CompactionController(cfs, sstables, 0);
 +                 CompactionIterator ci = 
CompactionManager.getAntiCompactionIterator(scanners, controller, 0, 
UUID.randomUUID(), CompactionManager.instance.active, () -> false))
 +            {
 +                // `ci` is our imaginary ongoing anticompaction which makes 
no progress until after 30s
 +                // now we try to start a new AC, which will try to cancel all 
ongoing compactions
 +
 +                CompactionManager.instance.active.beginCompaction(ci);
 +                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), 0, 0, es, () -> 
false);
 +                ListenableFuture fut = pac.run();
 +                try
 +                {
 +                    fut.get(30, TimeUnit.SECONDS);
 +                    fail("the future should throw exception since we try to 
start a new anticompaction when one is already running");
 +                }
 +                catch (ExecutionException e)
 +                {
 +                    assertTrue(e.getCause() instanceof 
PendingAntiCompaction.SSTableAcquisitionException);
 +                }
 +
 +                assertEquals(1, getCompactionsFor(cfs).size());
 +                for (CompactionInfo.Holder holder : getCompactionsFor(cfs))
 +                    assertFalse(holder.isStopRequested());
 +            }
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +            ISSTableScanner.closeAllAndPropagate(scanners, null);
 +        }
 +    }
 +
 +    private List<CompactionInfo.Holder> getCompactionsFor(ColumnFamilyStore 
cfs)
 +    {
 +        List<CompactionInfo.Holder> compactions = new ArrayList<>();
 +        for (CompactionInfo.Holder holder : 
CompactionManager.instance.active.getCompactions())
 +        {
 +            if 
(holder.getCompactionInfo().getTableMetadata().equals(cfs.metadata()))
 +                compactions.add(holder);
 +        }
 +        return compactions;
 +    }
 +
 +    @Test
 +    public void testUnblockedAcquisition() throws ExecutionException, 
InterruptedException
 +    {
 +        cfs.disableAutoCompaction();
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +        makeSSTables(2);
 +        UUID prsid = prepareSession();
 +        Set<SSTableReader> sstables = cfs.getLiveSSTables();
 +        List<ISSTableScanner> scanners = 
sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
 +        try
 +        {
 +            try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
 +                 CompactionController controller = new 
CompactionController(cfs, sstables, 0);
 +                 CompactionIterator ci = new 
CompactionIterator(OperationType.COMPACTION, scanners, controller, 0, 
UUID.randomUUID()))
 +            {
 +                // `ci` is our imaginary ongoing anticompaction which makes 
no progress until after 5s
 +                // now we try to start a new AC, which will try to cancel all 
ongoing compactions
 +
 +                CompactionManager.instance.active.beginCompaction(ci);
 +                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false);
 +                ListenableFuture fut = pac.run();
 +                try
 +                {
 +                    fut.get(5, TimeUnit.SECONDS);
 +                }
 +                catch (TimeoutException e)
 +                {
 +                    // expected, we wait 1 minute for compactions to get 
cancelled in runWithCompactionsDisabled, but we are not iterating
 +                    // CompactionIterator so the compaction is not actually 
cancelled
 +                }
 +                try
 +                {
 +                    assertTrue(ci.hasNext());
 +                    ci.next();
 +                    fail("CompactionIterator should be abortable");
 +                }
 +                catch (CompactionInterruptedException e)
 +                {
 +                    CompactionManager.instance.active.finishCompaction(ci);
 +                    txn.abort();
 +                    // expected
 +                }
 +                CountDownLatch cdl = new CountDownLatch(1);
 +                Futures.addCallback(fut, new FutureCallback<Object>()
 +                {
 +                    public void onSuccess(@Nullable Object o)
 +                    {
 +                        cdl.countDown();
 +                    }
 +
 +                    public void onFailure(Throwable throwable)
 +                    {
 +                    }
 +                }, MoreExecutors.directExecutor());
 +                assertTrue(cdl.await(1, TimeUnit.MINUTES));
 +            }
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
 +    @Test
 +    public void testSSTablePredicateOngoingAntiCompaction()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        cfs.disableAutoCompaction();
 +        List<SSTableReader> sstables = new ArrayList<>();
 +        List<SSTableReader> repairedSSTables = new ArrayList<>();
 +        List<SSTableReader> pendingSSTables = new ArrayList<>();
 +        for (int i = 1; i <= 10; i++)
 +        {
 +            SSTableReader sstable = MockSchema.sstable(i, i * 10, i * 10 + 9, 
cfs);
 +            sstables.add(sstable);
 +        }
 +        for (int i = 1; i <= 10; i++)
 +        {
 +            SSTableReader sstable = MockSchema.sstable(i + 10, i * 10, i * 10 
+ 9, cfs);
 +            AbstractPendingRepairTest.mutateRepaired(sstable, 
System.currentTimeMillis());
 +            repairedSSTables.add(sstable);
 +        }
 +        for (int i = 1; i <= 10; i++)
 +        {
 +            SSTableReader sstable = MockSchema.sstable(i + 20, i * 10, i * 10 
+ 9, cfs);
 +            AbstractPendingRepairTest.mutateRepaired(sstable, 
UUID.randomUUID(), false);
 +            pendingSSTables.add(sstable);
 +        }
 +
 +        cfs.addSSTables(sstables);
 +        cfs.addSSTables(repairedSSTables);
 +
 +        // if we are compacting the non-repaired non-pending sstables, we 
should get an error
 +        tryPredicate(cfs, sstables, null, true);
 +        // make sure we don't try to grab pending or repaired sstables;
 +        tryPredicate(cfs, repairedSSTables, sstables, false);
 +        tryPredicate(cfs, pendingSSTables, sstables, false);
 +    }
 +
 +    private void tryPredicate(ColumnFamilyStore cfs, List<SSTableReader> 
compacting, List<SSTableReader> expectedLive, boolean shouldFail)
 +    {
 +        CompactionInfo.Holder holder = new CompactionInfo.Holder()
 +        {
 +            public CompactionInfo getCompactionInfo()
 +            {
 +                return new CompactionInfo(cfs.metadata(), 
OperationType.ANTICOMPACTION, 0, 1000, UUID.randomUUID(), compacting);
 +            }
++
++            public boolean isGlobal()
++            {
++                return false;
++            }
 +        };
 +        CompactionManager.instance.active.beginCompaction(holder);
 +        try
 +        {
 +            PendingAntiCompaction.AntiCompactionPredicate predicate =
 +            new 
PendingAntiCompaction.AntiCompactionPredicate(Collections.singleton(new 
Range<>(new Murmur3Partitioner.LongToken(0), new 
Murmur3Partitioner.LongToken(100))),
 +                                                              
UUID.randomUUID());
 +            Set<SSTableReader> live = 
cfs.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet());
 +            if (shouldFail)
 +                fail("should fail - we try to grab already anticompacting 
sstables for anticompaction");
 +            assertEquals(live, new HashSet<>(expectedLive));
 +        }
 +        catch (PendingAntiCompaction.SSTableAcquisitionException e)
 +        {
 +            if (!shouldFail)
 +                fail("We should not fail filtering sstables");
 +        }
 +        finally
 +        {
 +            CompactionManager.instance.active.finishCompaction(holder);
 +        }
 +    }
 +
 +    @Test
 +    public void testRetries() throws InterruptedException, ExecutionException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        cfs.addSSTable(MockSchema.sstable(1, true, cfs));
 +        CountDownLatch cdl = new CountDownLatch(5);
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +        CompactionInfo.Holder holder = new CompactionInfo.Holder()
 +        {
 +            public CompactionInfo getCompactionInfo()
 +            {
 +                return new CompactionInfo(cfs.metadata(), 
OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables());
 +            }
++
++            public boolean isGlobal()
++            {
++                return false;
++            }
 +        };
 +        try
 +        {
 +            PendingAntiCompaction.AntiCompactionPredicate acp = new 
PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID())
 +            {
 +                @Override
 +                public boolean apply(SSTableReader sstable)
 +                {
 +                    cdl.countDown();
 +                    if (cdl.getCount() > 0)
 +                        throw new 
PendingAntiCompaction.SSTableAcquisitionException("blah");
 +                    return true;
 +                }
 +            };
 +            CompactionManager.instance.active.beginCompaction(holder);
 +            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = 
new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 10, 1, 
acp);
 +            Future f = es.submit(acquisitionCallable);
 +            cdl.await();
 +            assertNotNull(f.get());
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +            CompactionManager.instance.active.finishCompaction(holder);
 +        }
 +    }
 +
 +    @Test
 +    public void testRetriesTimeout() throws InterruptedException, 
ExecutionException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        cfs.addSSTable(MockSchema.sstable(1, true, cfs));
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +        CompactionInfo.Holder holder = new CompactionInfo.Holder()
 +        {
 +            public CompactionInfo getCompactionInfo()
 +            {
 +                return new CompactionInfo(cfs.metadata(), 
OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables());
 +            }
++
++            public boolean isGlobal()
++            {
++                return false;
++            }
 +        };
 +        try
 +        {
 +            PendingAntiCompaction.AntiCompactionPredicate acp = new 
PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID())
 +            {
 +                @Override
 +                public boolean apply(SSTableReader sstable)
 +                {
 +                    throw new 
PendingAntiCompaction.SSTableAcquisitionException("blah");
 +                }
 +            };
 +            CompactionManager.instance.active.beginCompaction(holder);
 +            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = 
new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 2, 1000, 
acp);
 +            Future fut = es.submit(acquisitionCallable);
 +            assertNull(fut.get());
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +            CompactionManager.instance.active.finishCompaction(holder);
 +        }
 +    }
 +
 +    @Test
 +    public void testWith2i() throws ExecutionException, InterruptedException
 +    {
 +        cfs2.disableAutoCompaction();
 +        makeSSTables(2, cfs2, 100);
 +        ColumnFamilyStore idx = 
cfs2.indexManager.getAllIndexColumnFamilyStores().iterator().next();
 +        ExecutorService es = Executors.newFixedThreadPool(1);
 +        try
 +        {
 +            UUID prsid = prepareSession();
 +            for (SSTableReader sstable : cfs2.getLiveSSTables())
 +                assertFalse(sstable.isPendingRepair());
 +
 +            // mark the sstables pending, with a 2i compaction going, which 
should be untouched;
 +            try (LifecycleTransaction txn = 
idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
 +            {
 +                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs2), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> 
false);
 +                pac.run().get();
 +            }
 +            // and make sure it succeeded;
 +            for (SSTableReader sstable : cfs2.getLiveSSTables())
 +                assertTrue(sstable.isPendingRepair());
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
 +    private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, 
Collection<Range<Token>> trans)
 +    {
 +        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
 +        for (Range<Token> range : full)
 +            builder.add(new Replica(local, range, true));
 +
 +        for (Range<Token> range : trans)
 +            builder.add(new Replica(local, range, false));
 +
 +        return builder.build();
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 68ee3e1,b33ead2..d383e88
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -61,6 -57,7 +61,7 @@@ import org.apache.cassandra.utils.ByteB
  
  import static com.google.common.collect.ImmutableMap.of;
  import static java.util.Arrays.asList;
 -import static 
org.apache.cassandra.db.compaction.AntiCompactionTest.assertOnDiskState;
++import static org.apache.cassandra.Util.assertOnDiskState;
  import static 
org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
  import static 
org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
  import static 
org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
@@@ -68,7 -65,9 +69,8 @@@ import static org.junit.Assert.assertEq
  import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertNull;
  import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
  
 -
  @RunWith(OrderedJUnit4ClassRunner.class)
  public class IndexSummaryManagerTest
  {
@@@ -640,69 -633,91 +642,111 @@@
          final AtomicReference<CompactionInterruptedException> exception = new 
AtomicReference<>();
          // barrier to control when redistribution runs
          final CountDownLatch barrier = new CountDownLatch(1);
 +        CompactionInfo.Holder ongoingCompaction = new CompactionInfo.Holder()
 +        {
 +            public CompactionInfo getCompactionInfo()
 +            {
 +                return new CompactionInfo(cfs.metadata(), 
OperationType.UNKNOWN, 0, 0, UUID.randomUUID(), compacting);
 +            }
+ 
 -        Thread t = NamedThreadFactory.createThread(new Runnable()
++            public boolean isGlobal()
++            {
++                return false;
++            }
 +        };
 +        try (LifecycleTransaction ignored = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN))
          {
 -            public void run()
 +            
CompactionManager.instance.active.beginCompaction(ongoingCompaction);
 +
 +            Thread t = NamedThreadFactory.createThread(new Runnable()
              {
 -                try
 +                public void run()
                  {
 -                    // Don't leave enough space for even the minimal index 
summaries
 -                    try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
 +                    try
 +                    {
 +                        // Don't leave enough space for even the minimal 
index summaries
 +                        try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
 +                        {
 +                            IndexSummaryManager.redistributeSummaries(new 
ObservableRedistribution(of(cfs.metadata.id, txn),
 +                                                                              
                     0,
 +                                                                              
                     singleSummaryOffHeapSpace,
 +                                                                              
                     barrier));
 +                        }
 +                    }
 +                    catch (CompactionInterruptedException ex)
 +                    {
 +                        exception.set(ex);
 +                    }
 +                    catch (IOException ignored)
                      {
 -                        IndexSummaryManager.redistributeSummaries(new 
ObservableRedistribution(Collections.EMPTY_LIST,
 -                                                                              
                 of(cfs.metadata.cfId, txn),
 -                                                                              
                 singleSummaryOffHeapSpace,
 -                                                                              
                 barrier));
                      }
                  }
 -                catch (CompactionInterruptedException ex)
 -                {
 -                    exception.set(ex);
 -                }
 -                catch (IOException ignored)
 -                {
 -                }
 -            }
 -        });
 -        t.start();
 -        while (CompactionManager.instance.getActiveCompactions() == 0 && 
t.isAlive())
 -            Thread.sleep(1);
 -        // to ensure that the stop condition check in 
IndexSummaryRedistribution::redistributeSummaries
 -        // is made *after* the halt request is made to the CompactionManager, 
don't allow the redistribution
 -        // to proceed until stopCompaction has been called.
 -        cancelFunction.accept(cfs);
 -        // allows the redistribution to proceed
 -        barrier.countDown();
 -        t.join();
 +            });
 +
 +            t.start();
 +            while (CompactionManager.instance.getActiveCompactions() < 2 && 
t.isAlive())
 +                Thread.sleep(1);
 +            // to ensure that the stop condition check in 
IndexSummaryRedistribution::redistributeSummaries
 +            // is made *after* the halt request is made to the 
CompactionManager, don't allow the redistribution
 +            // to proceed until stopCompaction has been called.
 +            cancelFunction.accept(cfs);
 +            // allows the redistribution to proceed
 +            barrier.countDown();
 +            t.join();
 +        }
 +        finally
 +        {
 +            
CompactionManager.instance.active.finishCompaction(ongoingCompaction);
 +        }
  
          assertNotNull("Expected compaction interrupted exception", 
exception.get());
 -        assertTrue("Expected no active compactions", 
CompactionMetrics.getCompactions().isEmpty());
 +        assertTrue("Expected no active compactions", 
CompactionManager.instance.active.getCompactions().isEmpty());
  
 -        Set<SSTableReader> beforeRedistributionSSTables = new 
HashSet<>(sstables);
 +        Set<SSTableReader> beforeRedistributionSSTables = new 
HashSet<>(allSSTables);
          Set<SSTableReader> afterCancelSSTables = new 
HashSet<>(cfs.getLiveSSTables());
          Set<SSTableReader> disjoint = 
Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
          assertTrue(String.format("Mismatched files before and after 
cancelling redistribution: %s",
                                   Joiner.on(",").join(disjoint)),
                     disjoint.isEmpty());
-         Util.assertOnDiskState(cfs, 8);
 -
 -        assertOnDiskState(cfs, numSSTables);
++        assertOnDiskState(cfs, 8);
+         validateData(cfs, numRows);
+     }
+ 
+     @Test
+     public void testPauseIndexSummaryManager() throws Exception
+     {
+         String ksname = KEYSPACE1;
+         String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no 
key caching
+         Keyspace keyspace = Keyspace.open(ksname);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+         int numSSTables = 4;
+         int numRows = 256;
+         createSSTables(ksname, cfname, numSSTables, numRows);
+ 
+         List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+         for (SSTableReader sstable : sstables)
+             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+ 
+         long singleSummaryOffHeapSpace = 
sstables.get(0).getIndexSummaryOffHeapSize();
+ 
+         // everything should get cut in half
+         assert sstables.size() == numSSTables;
+         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN))
+         {
+             try (AutoCloseable toresume = 
CompactionManager.instance.pauseGlobalCompaction())
+             {
 -                sstables = redistributeSummaries(Collections.emptyList(), 
of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
++                sstables = redistributeSummaries(Collections.emptyList(), 
of(cfs.metadata().id, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
+                 fail("The redistribution should fail - we got paused before 
adding to active compactions, but after marking compacting");
+             }
+         }
+         catch (CompactionInterruptedException e)
+         {
+             // expected
+         }
+         for (SSTableReader sstable : sstables)
+             assertEquals(BASE_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
          validateData(cfs, numRows);
+         assertOnDiskState(cfs, numSSTables);
      }
  
      private static List<SSTableReader> 
redistributeSummaries(List<SSTableReader> compacting,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to