This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 3ed1b798146e4536c854fd14bb6456ab29f78e82
Merge: 69b36a5 b773bc7
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Wed Aug 7 10:20:27 2019 +0200

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |  1 +
 .../cassandra/db/compaction/CompactionManager.java | 67 ++++++++---------
 .../db/compaction/CompactionStrategyManager.java   | 36 ++++------
 .../cassandra/db/compaction/CompactionTasks.java   | 74 +++++++++++++++++++
 test/unit/org/apache/cassandra/Util.java           | 10 +--
 .../db/compaction/CompactionsBytemanTest.java      | 84 +++++++++++++++++++++-
 .../db/compaction/CompactionsPurgeTest.java        | 16 +++--
 .../db/compaction/PendingRepairManagerTest.java    | 16 +----
 8 files changed, 225 insertions(+), 79 deletions(-)

diff --cc CHANGES.txt
index 42080d7,dc8baf2..dad8d40
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,372 -1,5 +1,373 @@@
 +4.0
 + * Align load column in nodetool status output (CASSANDRA-14787)
 + * CassandraNetworkAuthorizer uses cached roles info (CASSANDRA-15089)
 + * Introduce optional timeouts for idle client sessions (CASSANDRA-11097)
 + * Fix AlterTableStatement dropped type validation order (CASSANDRA-15203)
 + * Update Netty dependencies to latest, clean up SocketFactory 
(CASSANDRA-15195)
 + * Native Transport - Apply noSpamLogger to ConnectionLimitHandler 
(CASSANDRA-15167)
 + * Reduce heap pressure during compactions (CASSANDRA-14654)
 + * Support building Cassandra with JDK 11 (CASSANDRA-15108)
 + * Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710)
 + * Take sstable references before calculating approximate key count 
(CASSANDRA-14647)
 + * Restore snapshotting of system keyspaces on version change 
(CASSANDRA-14412)
 + * Fix AbstractBTreePartition locking in java 11 (CASSANDRA-14607)
 + * SimpleClient should pass connection properties as options (CASSANDRA-15056)
 + * Set repaired data tracking flag on range reads if enabled (CASSANDRA-15019)
 + * Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802)
 + * Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033)
 + * Add Zstd compressor (CASSANDRA-14482)
 + * Fix IR prepare anti-compaction race (CASSANDRA-15027)
 + * Fix SimpleStrategy option validation (CASSANDRA-15007)
 + * Don't try to cancel 2i compactions when starting anticompaction 
(CASSANDRA-15024)
 + * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025)
 + * SSL Cert Hot Reloading should check for sanity of the new 
keystore/truststore before loading it (CASSANDRA-14991)
 + * Avoid leaking threads when failing anticompactions and rate limit 
anticompactions (CASSANDRA-15002)
 + * Validate token() arguments early instead of throwing NPE at execution 
(CASSANDRA-14989)
 + * Add a new tool to dump audit logs (CASSANDRA-14885)
 + * Fix generating javadoc with Java11 (CASSANDRA-14988)
 + * Only cancel conflicting compactions when starting anticompactions and sub 
range compactions (CASSANDRA-14935)
 + * Use a stub IndexRegistry for non-daemon use cases (CASSANDRA-14938)
 + * Don't enable client transports when bootstrap is pending (CASSANDRA-14525)
 + * Make antiCompactGroup throw exception on error and anticompaction non 
cancellable
 +   again (CASSANDRA-14936)
 + * Catch empty/invalid bounds in SelectStatement (CASSANDRA-14849)
 + * Auto-expand replication_factor for NetworkTopologyStrategy 
(CASSANDRA-14303)
 + * Transient Replication: support EACH_QUORUM (CASSANDRA-14727)
 + * BufferPool: allocating thread for new chunks should acquire directly 
(CASSANDRA-14832)
 + * Send correct messaging version in internode messaging handshake's third 
message (CASSANDRA-14896)
 + * Make Read and Write Latency columns consistent for proxyhistograms and 
tablehistograms (CASSANDRA-11939)
 + * Make protocol checksum type option case insensitive (CASSANDRA-14716)
 + * Forbid re-adding static columns as regular and vice versa (CASSANDRA-14913)
 + * Audit log allows system keyspaces to be audited via configuration options 
(CASSANDRA-14498)
 + * Lower default chunk_length_in_kb from 64kb to 16kb (CASSANDRA-13241)
 + * Startup checker should wait for count rather than percentage 
(CASSANDRA-14297)
 + * Fix incorrect sorting of replicas in 
SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862)
 + * Partitioned outbound internode TCP connections can occur when nodes 
restart (CASSANDRA-14358)
 + * Don't write to system_distributed.repair_history, system_traces.sessions, 
system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841)
 + * Avoid running query to self through messaging service (CASSANDRA-14807)
 + * Allow using custom script for chronicle queue BinLog archival 
(CASSANDRA-14373)
 + * Transient->Full range movements mishandle consistency level upgrade 
(CASSANDRA-14759)
 + * ReplicaCollection follow-up (CASSANDRA-14726)
 + * Transient node receives full data requests (CASSANDRA-14762)
 + * Enable snapshot artifacts publish (CASSANDRA-12704)
 + * Introduce RangesAtEndpoint.unwrap to simplify 
StreamSession.addTransferRanges (CASSANDRA-14770)
 + * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout 
instead of Unavailable (CASSANDRA-14735)
 + * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
 + * Fail incremental repair prepare phase if it encounters sstables from 
un-finalized sessions (CASSANDRA-14763)
 + * Add a check for receiving digest response from transient node 
(CASSANDRA-14750)
 + * Fail query on transient replica if coordinator only expects full data 
(CASSANDRA-14704)
 + * Remove mentions of transient replication from repair path (CASSANDRA-14698)
 + * Fix handleRepairStatusChangedNotification to remove first then add 
(CASSANDRA-14720)
 + * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
 + * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot 
returns wrong value for size() and incorrectly calculates count 
(CASSANDRA-14696)
 + * AbstractReplicaCollection equals and hash code should throw due to 
conflict between order sensitive/insensitive uses (CASSANDRA-14700)
 + * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
 + * Add checksumming to the native protocol (CASSANDRA-13304)
 + * Make AuthCache more easily extendable (CASSANDRA-14662)
 + * Extend RolesCache to include detailed role info (CASSANDRA-14497)
 + * Add fqltool compare (CASSANDRA-14619)
 + * Add fqltool replay (CASSANDRA-14618)
 + * Log keyspace in full query log (CASSANDRA-14656)
 + * Transient Replication and Cheap Quorums (CASSANDRA-14404)
 + * Log server-generated timestamp and nowInSeconds used by queries in FQL 
(CASSANDRA-14675)
 + * Add diagnostic events for read repairs (CASSANDRA-14668)
 + * Use consistent nowInSeconds and timestamps values within a request 
(CASSANDRA-14671)
 + * Add sampler for query time and expose with nodetool (CASSANDRA-14436)
 + * Clean up Message.Request implementations (CASSANDRA-14677)
 + * Disable old native protocol versions on demand (CASANDRA-14659)
 + * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664)
 + * Improve BTree build performance by avoiding data copy (CASSANDRA-9989)
 + * Make monotonic read / read repair configurable (CASSANDRA-14635)
 + * Refactor CompactionStrategyManager (CASSANDRA-14621)
 + * Flush netty client messages immediately by default (CASSANDRA-13651)
 + * Improve read repair blocking behavior (CASSANDRA-10726)
 + * Add a virtual table to expose settings (CASSANDRA-14573)
 + * Fix up chunk cache handling of metrics (CASSANDRA-14628)
 + * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652)
 + * Incomplete handling of exceptions when decoding incoming messages 
(CASSANDRA-14574)
 + * Add diagnostic events for user audit logging (CASSANDRA-13668)
 + * Allow retrieving diagnostic events via JMX (CASSANDRA-14435)
 + * Add base classes for diagnostic events (CASSANDRA-13457)
 + * Clear view system metadata when dropping keyspace (CASSANDRA-14646)
 + * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase 
(CASSANDRA-14637)
 + * Make all existing virtual tables use LocalPartitioner (CASSANDRA-14640)
 + * Revert 4.0 GC alg back to CMS (CASANDRA-14636)
 + * Remove hardcoded java11 jvm args in idea workspace files (CASSANDRA-14627)
 + * Update netty to 4.1.128 (CASSANDRA-14633)
 + * Add a virtual table to expose thread pools (CASSANDRA-14523)
 + * Add a virtual table to expose caches (CASSANDRA-14538, CASSANDRA-14626)
 + * Fix toDate function for timestamp arguments (CASSANDRA-14502)
 + * Revert running dtests by default in circleci (CASSANDRA-14614)
 + * Stream entire SSTables when possible (CASSANDRA-14556)
 + * Cell reconciliation should not depend on nowInSec (CASSANDRA-14592)
 + * Add experimental support for Java 11 (CASSANDRA-9608)
 + * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable 
(CASSANDRA-14580)
 + * Improve logging in MessageInHandler's constructor (CASSANDRA-14576)
 + * Set broadcast address in internode messaging handshake (CASSANDRA-14579)
 + * Wait for schema agreement prior to building MVs (CASSANDRA-14571)
 + * Make all DDL statements idempotent and not dependent on global state 
(CASSANDRA-13426)
 + * Bump the hints messaging version to match the current one (CASSANDRA-14536)
 + * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind 
errors in CircleCI (CASSANDRA-14546)
 + * Report why native_transport_port fails to bind (CASSANDRA-14544)
 + * Optimize internode messaging protocol (CASSANDRA-14485)
 + * Internode messaging handshake sends wrong messaging version number 
(CASSANDRA-14540)
 + * Add a virtual table to expose active client connections (CASSANDRA-14458)
 + * Clean up and refactor client metrics (CASSANDRA-14524)
 + * Nodetool import row cache invalidation races with adding sstables to 
tracker (CASSANDRA-14529)
 + * Fix assertions in LWTs after TableMetadata was made immutable 
(CASSANDRA-14356)
 + * Abort compactions quicker (CASSANDRA-14397)
 + * Support light-weight transactions in cassandra-stress (CASSANDRA-13529)
 + * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509)
 + * Add option to sanity check tombstones on reads/compactions 
(CASSANDRA-14467)
 + * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)
 + * Let nodetool import take a list of directories (CASSANDRA-14442)
 + * Avoid unneeded memory allocations / cpu for disabled log levels 
(CASSANDRA-14488)
 + * Implement virtual keyspace interface (CASSANDRA-7622)
 + * nodetool import cleanup and improvements (CASSANDRA-14417)
 + * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
 + * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
 + * Audit logging for database activity (CASSANDRA-12151)
 + * Clean up build artifacts in docs container (CASSANDRA-14432)
 + * Minor network authz improvements (Cassandra-14413)
 + * Automatic sstable upgrades (CASSANDRA-14197)
 + * Replace deprecated junit.framework.Assert usages with org.junit.Assert 
(CASSANDRA-14431)
 + * Cassandra-stress throws NPE if insert section isn't specified in user 
profile (CASSSANDRA-14426)
 + * List clients by protocol versions `nodetool clientstats --by-protocol` 
(CASSANDRA-14335)
 + * Improve LatencyMetrics performance by reducing write path processing 
(CASSANDRA-14281)
 + * Add network authz (CASSANDRA-13985)
 + * Use the correct IP/Port for Streaming when localAddress is left unbound 
(CASSANDRA-14389)
 + * nodetool listsnapshots is missing local system keyspace snapshots 
(CASSANDRA-14381)
 + * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
 + * Rename nodetool --with-port to --print-port to disambiguate from --port 
(CASSANDRA-14392)
 + * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398)
 + * Add ability to load new SSTables from a separate directory (CASSANDRA-6719)
 + * Eliminate background repair and probablistic read_repair_chance table 
options
 +   (CASSANDRA-13910)
 + * Bind to correct local address in 4.0 streaming (CASSANDRA-14362)
 + * Use standard Amazon naming for datacenter and rack in Ec2Snitch 
(CASSANDRA-7839)
 + * Fix junit failure for SSTableReaderTest (CASSANDRA-14387)
 + * Abstract write path for pluggable storage (CASSANDRA-14118)
 + * nodetool describecluster should be more informative (CASSANDRA-13853)
 + * Compaction performance improvements (CASSANDRA-14261) 
 + * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260)
 + * Add options to nodetool tablestats to sort and limit output 
(CASSANDRA-13889)
 + * Rename internals to reflect CQL vocabulary (CASSANDRA-14354)
 + * Add support for hybrid MIN(), MAX() speculative retry policies
 +   (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352)
 + * Fix some regressions caused by 14058 (CASSANDRA-14353)
 + * Abstract repair for pluggable storage (CASSANDRA-14116)
 + * Add meaningful toString() impls (CASSANDRA-13653)
 + * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
 + * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)
 + * Add coordinator write metric per CF (CASSANDRA-14232)
 + * Correct and clarify SSLFactory.getSslContext method and call sites 
(CASSANDRA-14314)
 + * Handle static and partition deletion properly on 
ThrottledUnfilteredIterator (CASSANDRA-14315)
 + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322)
 + * Add ability to specify driver name and version (CASSANDRA-14275)
 + * Abstract streaming for pluggable storage (CASSANDRA-14115)
 + * Forced incremental repairs should promote sstables if they can 
(CASSANDRA-14294)
 + * Use Murmur3 for validation compactions (CASSANDRA-14002)
 + * Comma at the end of the seed list is interpretated as localhost 
(CASSANDRA-14285)
 + * Refactor read executor and response resolver, abstract read repair 
(CASSANDRA-14058)
 + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
 + * Add a few options to nodetool verify (CASSANDRA-14201)
 + * CVE-2017-5929 Security vulnerability and redefine default log rotation 
policy (CASSANDRA-14183)
 + * Use JVM default SSL validation algorithm instead of custom default 
(CASSANDRA-13259)
 + * Better document in code InetAddressAndPort usage post 7544, incorporate 
port into UUIDGen node (CASSANDRA-14226)
 + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132)
 + * Make it possible to change neverPurgeTombstones during runtime 
(CASSANDRA-14214)
 + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174)
 + * Add nodetool clientlist (CASSANDRA-13665)
 + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211)
 + * Non-disruptive seed node list reload (CASSANDRA-14190)
 + * Nodetool tablehistograms to print statics for all the tables 
(CASSANDRA-14185)
 + * Migrate dtests to use pytest and python3 (CASSANDRA-14134)
 + * Allow storage port to be configurable per node (CASSANDRA-7544)
 + * Make sub-range selection for non-frozen collections return null instead of 
empty (CASSANDRA-14182)
 + * BloomFilter serialization format should not change byte ordering 
(CASSANDRA-9067)
 + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152)
 + * Delete temp test files on exit (CASSANDRA-14153)
 + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
 + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
 + * Fix cassandra-stress startup failure (CASSANDRA-14106)
 + * Remove initialDirectories from CFS (CASSANDRA-13928)
 + * Fix trivial log format error (CASSANDRA-14015)
 + * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
 + * Add option to optimise merkle tree comparison across replicas 
(CASSANDRA-3200)
 + * Remove unused and deprecated methods from AbstractCompactionStrategy 
(CASSANDRA-14081)
 + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
 + * Support a means of logging all queries as they were invoked 
(CASSANDRA-13983)
 + * Presize collections (CASSANDRA-13760)
 + * Add GroupCommitLogService (CASSANDRA-13530)
 + * Parallelize initial materialized view build (CASSANDRA-12245)
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt 
(CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild 
(CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes 
(CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation 
(CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming 
(CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf 
(CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times 
(CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests 
(CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM 
(CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 
(CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair 
(CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization 
(CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps 
(CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception 
(CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds 
(CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes 
(CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs 
(CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables 
(CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new 
Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped 
(CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache 
(CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config 
(CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from 
(CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause 
(CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool 
setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling 
(CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction 
(CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache 
plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads 
(CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming 
(CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions 
(CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times 
(CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order 
(CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string 
(CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction 
tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements 
(CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples 
(CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from 
actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID 
functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers 
(CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService 
(CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files 
(CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair 
(CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace 
(CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter 
(CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication 
factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection 
(CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes 
(CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options 
(CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the 
repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 + * nodetool clearsnapshot requires --all to clear all snapshots 
(CASSANDRA-13391)
 + * Correctly count range tombstones in traces and tombstone thresholds 
(CASSANDRA-8527)
 + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243)
 +
 +
  3.11.5
+  * Make sure user defined compaction transactions are always closed 
(CASSANDRA-15123)
   * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config 
(CASSANDRA-14305)
   * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
   * Add flag to disable SASI indexes, and warnings on creation 
(CASSANDRA-14866)
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index d387701,7086d77..896fa2a
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -817,14 -745,14 +817,15 @@@ public class CompactionManager implemen
          FBUtilities.waitOnFutures(submitMaximal(cfStore, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()), splitOutput));
      }
  
++    @SuppressWarnings("resource") // the tasks are executed in parallel on 
the executor, making sure that they get closed
      public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, 
final int gcBefore, boolean splitOutput)
      {
          // here we compute the task off the compaction executor, so having 
that present doesn't
          // confuse runWithCompactionsDisabled -- i.e., we don't want to 
deadlock ourselves, waiting
          // for ourselves to finish/acknowledge cancellation before continuing.
--        final Collection<AbstractCompactionTask> tasks = 
cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput);
++        CompactionTasks tasks = 
cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput);
  
--        if (tasks == null)
++        if (tasks.isEmpty())
              return Collections.emptyList();
  
          List<Future<?>> futures = new ArrayList<>();
@@@ -850,42 -778,45 +851,42 @@@
          if (nonEmptyTasks > 1)
              logger.info("Major compaction will not result in a single sstable 
- repaired and unrepaired data is kept separate and compaction runs per 
data_file_directory.");
  
--
          return futures;
      }
  
      public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, 
Collection<Range<Token>> ranges)
      {
-         final Collection<AbstractCompactionTask> tasks = 
cfStore.runWithCompactionsDisabled(() ->
-                    {
-                        Collection<SSTableReader> sstables = 
sstablesInBounds(cfStore, ranges);
-                        if (sstables == null || sstables.isEmpty())
-                        {
-                            logger.debug("No sstables found for the provided 
token range");
-                            return null;
-                        }
-                        return 
cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
-                    }, (sstable) -> new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges), false, false, false);
- 
-         if (tasks == null)
-             return;
- 
-         Runnable runnable = new WrappedRunnable()
-         {
-             protected void runMayThrow()
 -        Callable<Collection<AbstractCompactionTask>> taskCreator = () -> {
++        Callable<CompactionTasks> taskCreator = () -> {
+             Collection<SSTableReader> sstables = sstablesInBounds(cfStore, 
ranges);
+             if (sstables == null || sstables.isEmpty())
              {
-                 for (AbstractCompactionTask task : tasks)
-                     if (task != null)
-                         task.execute(active);
+                 logger.debug("No sstables found for the provided token 
range");
 -                return null;
++                return CompactionTasks.empty();
              }
+             return 
cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
          };
  
-         if (executor.isShutdown())
 -        final Collection<AbstractCompactionTask> tasks = 
cfStore.runWithCompactionsDisabled(taskCreator, false, false);
 -
 -        if (tasks == null)
 -            return;
 -
 -        Runnable runnable = new WrappedRunnable()
++        try (CompactionTasks tasks = 
cfStore.runWithCompactionsDisabled(taskCreator,
++                                                                        
(sstable) -> new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges),
++                                                                        false,
++                                                                        false,
++                                                                        
false))
          {
-             logger.info("Compaction executor has shut down, not submitting 
task");
-             return;
 -            protected void runMayThrow() throws Exception
++            if (tasks.isEmpty())
++                return;
++
++            Runnable runnable = new WrappedRunnable()
+             {
 -                try
++                protected void runMayThrow()
+                 {
+                     for (AbstractCompactionTask task : tasks)
+                         if (task != null)
 -                            task.execute(metrics);
++                            task.execute(active);
+                 }
 -                finally
 -                {
 -                    FBUtilities.closeAll(tasks.stream().map(task -> 
task.transaction).collect(Collectors.toList()));
 -                }
 -            }
 -        };
++            };
+ 
 -        FBUtilities.waitOnFuture(executor.submitIfRunning(runnable, "force 
compaction for token range"));
++            FBUtilities.waitOnFuture(executor.submitIfRunning(runnable, 
"force compaction for token range"));
 +        }
-         FBUtilities.waitOnFuture(executor.submit(runnable));
      }
  
      private static Collection<SSTableReader> 
sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> 
tokenRangeCollection)
@@@ -1015,12 -943,19 +1016,14 @@@
                  }
                  else
                  {
--                    List<AbstractCompactionTask> tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
-                     for (AbstractCompactionTask task : tasks)
 -                    try
++                    try (CompactionTasks tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore))
                      {
-                         if (task != null)
-                             task.execute(active);
+                         for (AbstractCompactionTask task : tasks)
+                         {
+                             if (task != null)
 -                                task.execute(metrics);
++                                task.execute(active);
+                         }
                      }
 -                    finally
 -                    {
 -                        FBUtilities.closeAll(tasks.stream().map(task -> 
task.transaction).collect(Collectors.toList()));
 -                    }
                  }
              }
          };
diff --cc 
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index b978641,86170a1..fd4dbeb
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -18,18 -18,8 +18,17 @@@
  package org.apache.cassandra.db.compaction;
  
  
 -import java.util.*;
 -import java.util.concurrent.Callable;
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.ConcurrentModificationException;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
- import java.util.concurrent.Callable;
  import java.util.concurrent.locks.ReentrantReadWriteLock;
  import java.util.function.Supplier;
  import java.util.stream.Collectors;
@@@ -954,36 -805,45 +953,29 @@@ public class CompactionStrategyManager 
          {
              readLock.unlock();
          }
 -
      }
  
--    public Collection<AbstractCompactionTask> getMaximalTasks(final int 
gcBefore, final boolean splitOutput)
++    public CompactionTasks getMaximalTasks(final int gcBefore, final boolean 
splitOutput)
      {
          maybeReloadDiskBoundaries();
          // runWithCompactionsDisabled cancels active compactions and disables 
them, then we are able
          // to make the repaired/unrepaired strategies mark their own sstables 
as compacting. Once the
          // sstables are marked the compactions are re-enabled
--        return cfs.runWithCompactionsDisabled(new 
Callable<Collection<AbstractCompactionTask>>()
--        {
--            @Override
--            public Collection<AbstractCompactionTask> call()
++        return cfs.runWithCompactionsDisabled(() -> {
++            List<AbstractCompactionTask> tasks = new ArrayList<>();
++            readLock.lock();
++            try
              {
--                List<AbstractCompactionTask> tasks = new ArrayList<>();
--                readLock.lock();
--                try
--                {
-                     for (AbstractStrategyHolder holder : holders)
 -                    for (AbstractCompactionStrategy strategy : repaired)
--                    {
-                         tasks.addAll(holder.getMaximalTasks(gcBefore, 
splitOutput));
 -                        Collection<AbstractCompactionTask> task = 
strategy.getMaximalTask(gcBefore, splitOutput);
 -                        if (task != null)
 -                            tasks.addAll(task);
 -                    }
 -                    for (AbstractCompactionStrategy strategy : unrepaired)
 -                    {
 -                        Collection<AbstractCompactionTask> task = 
strategy.getMaximalTask(gcBefore, splitOutput);
 -                        if (task != null)
 -                            tasks.addAll(task);
--                    }
--                }
--                finally
++                for (AbstractStrategyHolder holder : holders)
                  {
--                    readLock.unlock();
++                    tasks.addAll(holder.getMaximalTasks(gcBefore, 
splitOutput));
                  }
--                if (tasks.isEmpty())
--                    return null;
--                return tasks;
              }
++            finally
++            {
++                readLock.unlock();
++            }
++            return CompactionTasks.create(tasks);
          }, false, false);
      }
  
@@@ -996,19 -856,37 +988,19 @@@
       * @param gcBefore gc grace period, throw away tombstones older than this
       * @return a list of compaction tasks corresponding to the sstables 
requested
       */
--    public List<AbstractCompactionTask> 
getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
 -    {
 -        return getUserDefinedTasks(sstables, gcBefore, false);
 -    }
 -
 -    public List<AbstractCompactionTask> 
getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore, boolean 
validateForCompaction)
++    public CompactionTasks getUserDefinedTasks(Collection<SSTableReader> 
sstables, int gcBefore)
      {
          maybeReloadDiskBoundaries();
          List<AbstractCompactionTask> ret = new ArrayList<>();
          readLock.lock();
          try
          {
 -            if (validateForCompaction)
 -                validateForCompaction(sstables);
 -
 -            Map<Integer, List<SSTableReader>> repairedSSTables = 
sstables.stream()
 -                                                                         
.filter(s -> !s.isMarkedSuspect() && s.isRepaired())
 -                                                                         
.collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
 -
 -            Map<Integer, List<SSTableReader>> unrepairedSSTables = 
sstables.stream()
 -                                                                           
.filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
 -                                                                           
.collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
 -
 -
 -            for (Map.Entry<Integer, List<SSTableReader>> group : 
repairedSSTables.entrySet())
 -                
ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), 
gcBefore));
 -
 -            for (Map.Entry<Integer, List<SSTableReader>> group : 
unrepairedSSTables.entrySet())
 -                
ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), 
gcBefore));
 -
 -            return ret;
 +            List<GroupedSSTableContainer> groupedSSTables = 
groupSSTables(sstables);
 +            for (int i = 0; i < holders.size(); i++)
 +            {
 +                
ret.addAll(holders.get(i).getUserDefinedTasks(groupedSSTables.get(i), 
gcBefore));
 +            }
-             return ret;
++            return CompactionTasks.create(ret);
          }
          finally
          {
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTasks.java
index 0000000,0000000..af0dbd0
new file mode 100644
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTasks.java
@@@ -1,0 -1,0 +1,74 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.db.compaction;
++
++import java.util.AbstractCollection;
++import java.util.Collection;
++import java.util.Collections;
++import java.util.Iterator;
++import java.util.stream.Collectors;
++
++import org.apache.cassandra.utils.FBUtilities;
++
++public class CompactionTasks extends 
AbstractCollection<AbstractCompactionTask> implements AutoCloseable
++{
++    @SuppressWarnings("resource")
++    private static final CompactionTasks EMPTY = new 
CompactionTasks(Collections.emptyList());
++
++    private final Collection<AbstractCompactionTask> tasks;
++
++    private CompactionTasks(Collection<AbstractCompactionTask> tasks)
++    {
++        this.tasks = tasks;
++    }
++
++    public static CompactionTasks create(Collection<AbstractCompactionTask> 
tasks)
++    {
++        if (tasks == null || tasks.isEmpty())
++            return EMPTY;
++        return new CompactionTasks(tasks);
++    }
++
++    public static CompactionTasks empty()
++    {
++        return EMPTY;
++    }
++
++    public Iterator<AbstractCompactionTask> iterator()
++    {
++        return tasks.iterator();
++    }
++
++    public int size()
++    {
++        return tasks.size();
++    }
++
++    public void close()
++    {
++        try
++        {
++            FBUtilities.closeAll(tasks.stream().map(task -> 
task.transaction).collect(Collectors.toList()));
++        }
++        catch (Exception e)
++        {
++            throw new RuntimeException(e);
++        }
++    }
++}
diff --cc test/unit/org/apache/cassandra/Util.java
index df45f3c,006cd76..3dcaff7
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -41,14 -39,8 +41,14 @@@ import org.apache.commons.lang3.StringU
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
++import org.apache.cassandra.db.compaction.CompactionTasks;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.ReplicaCollection;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.ColumnIdentifier;
  
@@@ -247,9 -241,9 +247,11 @@@ public class Uti
      public static void compact(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables)
      {
          int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds());
--        List<AbstractCompactionTask> tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
--        for (AbstractCompactionTask task : tasks)
-             task.execute(ActiveCompactionsTracker.NOOP);
 -            task.execute(null);
++        try (CompactionTasks tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore))
++        {
++            for (AbstractCompactionTask task : tasks)
++                task.execute(ActiveCompactionsTracker.NOOP);
++        }
      }
  
      public static void expectEOF(Callable<?> callable)
diff --cc 
test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
index 2519389,d5f2800..95069f1
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
@@@ -18,16 -18,27 +18,24 @@@
  
  package org.apache.cassandra.db.compaction;
  
 -import java.io.IOException;
 +import java.util.concurrent.TimeUnit;
+ import java.util.Collection;
+ import java.util.Collections;
 -import java.util.HashSet;
 -import java.util.Set;
+ import java.util.function.Consumer;
+ import java.util.stream.Collectors;
  
  import org.junit.Test;
  import org.junit.runner.RunWith;
  
  import org.apache.cassandra.cql3.CQLTester;
  import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.dht.Range;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.metrics.CompactionMetrics;
  import org.apache.cassandra.utils.FBUtilities;
  import org.jboss.byteman.contrib.bmunit.BMRule;
 +import org.jboss.byteman.contrib.bmunit.BMRules;
  import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
  
  import static org.junit.Assert.assertEquals;
@@@ -124,21 -69,87 +134,93 @@@ public class CompactionsBytemanTest ext
          assertEquals(0, CompactionManager.instance.compactingCF.count(cfs));
      }
  
 +    private void createPossiblyExpiredSSTable(final ColumnFamilyStore cfs, 
final boolean expired) throws Throwable
 +    {
 +        if (expired)
 +        {
 +            execute("INSERT INTO %s (id, val) values (1, 'expired') USING TTL 
1");
 +            Thread.sleep(TimeUnit.SECONDS.toMillis((long)1.5));
 +        }
 +        else
 +        {
 +            execute("INSERT INTO %s (id, val) values (2, 'immortal')");
 +        }
 +        cfs.forceBlockingFlush();
 +    }
 +
 +    private void createLowGCGraceTable(){
 +        createTable("CREATE TABLE %s (id int PRIMARY KEY, val text) with 
compaction = {'class':'SizeTieredCompactionStrategy', 'enabled': 'false'} AND 
gc_grace_seconds=0");
 +    }
- }
++
+     @Test
+     @BMRule(name = "Stop all compactions",
+     targetClass = "CompactionTask",
+     targetMethod = "runMayThrow",
+     targetLocation = "AT INVOKE getCompactionAwareWriter",
+     action = "$ci.stop()")
+     public void testStopUserDefinedCompactionRepaired() throws Throwable
+     {
+         testStopCompactionRepaired((cfs) -> {
+             Collection<Descriptor> files = 
cfs.getLiveSSTables().stream().map(s -> 
s.descriptor).collect(Collectors.toList());
+             
FBUtilities.waitOnFuture(CompactionManager.instance.submitUserDefined(cfs, 
files, CompactionManager.NO_GC));
+         });
+     }
+ 
+     @Test
+     @BMRule(name = "Stop all compactions",
+     targetClass = "CompactionTask",
+     targetMethod = "runMayThrow",
+     targetLocation = "AT INVOKE getCompactionAwareWriter",
+     action = "$ci.stop()")
+     public void testStopSubRangeCompactionRepaired() throws Throwable
+     {
+         testStopCompactionRepaired((cfs) -> {
+             Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(cfs.getPartitioner().getMinimumToken(),
+                                                                               
  cfs.getPartitioner().getMaximumToken()));
+             CompactionManager.instance.forceCompactionForTokenRange(cfs, 
ranges);
+         });
+     }
+ 
+     public void testStopCompactionRepaired(Consumer<ColumnFamilyStore> 
compactionRunner) throws Throwable
+     {
+         String table = createTable("CREATE TABLE %s (k INT, c INT, v INT, 
PRIMARY KEY (k, c))");
+         ColumnFamilyStore cfs = 
Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(table);
+         cfs.disableAutoCompaction();
+         for (int i = 0; i < 5; i++)
+         {
+             for (int j = 0; j < 10; j++)
+             {
+                 execute("insert into %s (k, c, v) values (?, ?, ?)", i, j, 
i*j);
+             }
+             cfs.forceBlockingFlush();
+         }
 -        setRepaired(cfs, cfs.getLiveSSTables());
++        
cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), 
System.currentTimeMillis(), null, false);
+         for (int i = 0; i < 5; i++)
+         {
+             for (int j = 0; j < 10; j++)
+             {
+                 execute("insert into %s (k, c, v) values (?, ?, ?)", i, j, 
i*j);
+             }
+             cfs.forceBlockingFlush();
+         }
+ 
+         assertTrue(cfs.getTracker().getCompacting().isEmpty());
 -        assertTrue(CompactionMetrics.getCompactions().stream().noneMatch(h -> 
h.getCompactionInfo().getCFMetaData().equals(cfs.metadata)));
++        
assertTrue(CompactionManager.instance.active.getCompactions().stream().noneMatch(h
 -> h.getCompactionInfo().getTableMetadata().equals(cfs.metadata)));
+ 
+         try
+         {
+             compactionRunner.accept(cfs);
+             fail("compaction should fail");
+         }
+         catch (RuntimeException t)
+         {
+             if (!(t.getCause().getCause() instanceof 
CompactionInterruptedException))
+                 throw t;
+             //expected
+         }
+ 
+         assertTrue(cfs.getTracker().getCompacting().isEmpty());
 -        assertTrue(CompactionMetrics.getCompactions().stream().noneMatch(h -> 
h.getCompactionInfo().getCFMetaData().equals(cfs.metadata)));
 -
 -    }
++        
assertTrue(CompactionManager.instance.active.getCompactions().stream().noneMatch(h
 -> h.getCompactionInfo().getTableMetadata().equals(cfs.metadata)));
+ 
 -    private void setRepaired(ColumnFamilyStore cfs, Iterable<SSTableReader> 
sstables) throws IOException
 -    {
 -        Set<SSTableReader> changed = new HashSet<>();
 -        for (SSTableReader sstable: sstables)
 -        {
 -            
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 
System.currentTimeMillis());
 -            sstable.reloadSSTableMetadata();
 -            changed.add(sstable);
 -        }
 -        cfs.getTracker().notifySSTableRepairedStatusChanged(changed);
+     }
+ }
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index dcd5270,f5b1641..a0d52aa
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@@ -19,9 -19,9 +19,9 @@@
  package org.apache.cassandra.db.compaction;
  
  import java.util.Collection;
--import java.util.List;
  import java.util.concurrent.ExecutionException;
  
++import com.google.common.collect.Iterables;
  import org.junit.BeforeClass;
  import org.junit.Test;
  
@@@ -303,9 -298,9 +303,10 @@@ public class CompactionsPurgeTes
                  .build().applyUnsafe();
  
          cfs.forceBlockingFlush();
--        List<AbstractCompactionTask> tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, 
Integer.MAX_VALUE);
--        assertEquals(1, tasks.size());
-         tasks.get(0).execute(ActiveCompactionsTracker.NOOP);
 -        tasks.get(0).execute(null);
++        try (CompactionTasks tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, 
Integer.MAX_VALUE))
++        {
++            
Iterables.getOnlyElement(tasks).execute(ActiveCompactionsTracker.NOOP);
++        }
  
          // verify that minor compaction does GC when key is provably not
          // present in a non-compacted sstable
@@@ -354,9 -349,9 +355,10 @@@
          cfs.forceBlockingFlush();
  
          // compact the sstables with the c1/c2 data and the c1 tombstone
--        List<AbstractCompactionTask> tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, 
Integer.MAX_VALUE);
--        assertEquals(1, tasks.size());
-         tasks.get(0).execute(ActiveCompactionsTracker.NOOP);
 -        tasks.get(0).execute(null);
++        try (CompactionTasks tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, 
Integer.MAX_VALUE))
++        {
++            
Iterables.getOnlyElement(tasks).execute(ActiveCompactionsTracker.NOOP);
++        }
  
          // We should have both the c1 and c2 tombstones still. Since the min 
timestamp in the c2 tombstone
          // sstable is older than the c1 tombstone, it is invalid to throw out 
the c1 tombstone.
diff --cc 
test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
index 4e645fd,0000000..9f4cf8d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
@@@ -1,318 -1,0 +1,308 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.compaction;
 +
 +import java.util.Collection;
 +import java.util.Collections;
- import java.util.List;
 +import java.util.UUID;
 +
 +import com.google.common.collect.Lists;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +public class PendingRepairManagerTest extends AbstractPendingRepairTest
 +{
 +    /**
 +     * If a local session is ongoing, it should not be cleaned up
 +     */
 +    @Test
 +    public void needsCleanupInProgress()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        Assert.assertNotNull(prm.get(repairID));
 +
 +        Assert.assertFalse(prm.canCleanup(repairID));
 +    }
 +
 +    /**
 +     * If a local session is finalized, it should be cleaned up
 +     */
 +    @Test
 +    public void needsCleanupFinalized()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        Assert.assertNotNull(prm.get(repairID));
 +        LocalSessionAccessor.finalizeUnsafe(repairID);
 +
 +        Assert.assertTrue(prm.canCleanup(repairID));
 +    }
 +
 +    /**
 +     * If a local session has failed, it should be cleaned up
 +     */
 +    @Test
 +    public void needsCleanupFailed()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        Assert.assertNotNull(prm.get(repairID));
 +        LocalSessionAccessor.failUnsafe(repairID);
 +
 +        Assert.assertTrue(prm.canCleanup(repairID));
 +    }
 +
 +    @Test
 +    public void needsCleanupNoSession()
 +    {
 +        UUID fakeID = UUIDGen.getTimeUUID();
 +        PendingRepairManager prm = new PendingRepairManager(cfs, null, false);
 +        Assert.assertTrue(prm.canCleanup(fakeID));
 +    }
 +
 +    @Test
 +    public void estimateRemainingTasksInProgress()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        Assert.assertNotNull(prm.get(repairID));
 +
 +        Assert.assertEquals(0, prm.getEstimatedRemainingTasks());
 +        Assert.assertEquals(0, prm.getNumPendingRepairFinishedTasks());
 +    }
 +
 +    @Test
 +    public void estimateRemainingFinishedRepairTasks()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        Assert.assertNotNull(prm.get(repairID));
 +        Assert.assertNotNull(prm.get(repairID));
 +        LocalSessionAccessor.finalizeUnsafe(repairID);
 +
 +        Assert.assertEquals(0, prm.getEstimatedRemainingTasks());
 +        Assert.assertEquals(1, prm.getNumPendingRepairFinishedTasks());
 +    }
 +
 +    @Test
 +    public void getNextBackgroundTask()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +
 +        repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +        sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        LocalSessionAccessor.finalizeUnsafe(repairID);
 +
 +        Assert.assertEquals(2, prm.getSessions().size());
 +        
Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds()));
 +        AbstractCompactionTask compactionTask = 
prm.getNextRepairFinishedTask();
 +        try
 +        {
 +            Assert.assertNotNull(compactionTask);
 +            
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, 
compactionTask.getClass());
 +            PendingRepairManager.RepairFinishedCompactionTask cleanupTask = 
(PendingRepairManager.RepairFinishedCompactionTask) compactionTask;
 +            Assert.assertEquals(repairID, cleanupTask.getSessionID());
 +        }
 +        finally
 +        {
 +            compactionTask.transaction.abort();
 +        }
 +    }
 +
 +    @Test
 +    public void getNextBackgroundTaskNoSessions()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +        
Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds()));
 +    }
 +
 +    /**
 +     * If all sessions should be cleaned up, getNextBackgroundTask should 
return null
 +     */
 +    @Test
 +    public void getNextBackgroundTaskAllCleanup() throws Exception
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        Assert.assertNotNull(prm.get(repairID));
 +        Assert.assertNotNull(prm.get(repairID));
 +        LocalSessionAccessor.finalizeUnsafe(repairID);
 +
 +        
Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds()));
 +
 +    }
 +
 +    @Test
 +    public void maximalTaskNeedsCleanup()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        Assert.assertNotNull(prm.get(repairID));
 +        Assert.assertNotNull(prm.get(repairID));
 +        LocalSessionAccessor.finalizeUnsafe(repairID);
 +
 +        Collection<AbstractCompactionTask> tasks = 
prm.getMaximalTasks(FBUtilities.nowInSeconds(), false);
 +        try
 +        {
 +            Assert.assertEquals(1, tasks.size());
 +        }
 +        finally
 +        {
 +            tasks.stream().forEach(t -> t.transaction.abort());
 +        }
 +    }
 +
 +    @Test
 +    public void userDefinedTaskTest()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +        UUID repairId = registerSession(cfs, true, true);
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairId, false);
 +        prm.addSSTable(sstable);
-         List<AbstractCompactionTask> tasks = 
csm.getUserDefinedTasks(Collections.singleton(sstable), 100);
-         try
++
++        try (CompactionTasks tasks = 
csm.getUserDefinedTasks(Collections.singleton(sstable), 100))
 +        {
 +            Assert.assertEquals(1, tasks.size());
 +        }
-         finally
-         {
-             tasks.stream().forEach(t -> t.transaction.abort());
-         }
 +    }
 +
 +    @Test
 +    public void mixedPendingSessionsTest()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +        UUID repairId = registerSession(cfs, true, true);
 +        UUID repairId2 = registerSession(cfs, true, true);
 +        SSTableReader sstable = makeSSTable(true);
 +        SSTableReader sstable2 = makeSSTable(true);
 +
 +        mutateRepaired(sstable, repairId, false);
 +        mutateRepaired(sstable2, repairId2, false);
 +        prm.addSSTable(sstable);
 +        prm.addSSTable(sstable2);
-         List<AbstractCompactionTask> tasks = 
csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100);
-         try
++        try (CompactionTasks tasks = 
csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100))
 +        {
 +            Assert.assertEquals(2, tasks.size());
 +        }
-         finally
-         {
-             tasks.stream().forEach(t -> t.transaction.abort());
-         }
 +    }
 +
 +    /**
 +     * Tests that a IllegalSSTableArgumentException is thrown if we try to get
 +     * scanners for an sstable that isn't pending repair
 +     */
 +    @Test(expected = 
PendingRepairManager.IllegalSSTableArgumentException.class)
 +    public void getScannersInvalidSSTable() throws Exception
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +        SSTableReader sstable = makeSSTable(true);
 +        prm.getScanners(Collections.singleton(sstable), 
Collections.singleton(RANGE1));
 +    }
 +
 +    /**
 +     * Tests that a IllegalSSTableArgumentException is thrown if we try to get
 +     * scanners for an sstable that isn't pending repair
 +     */
 +    @Test(expected = 
PendingRepairManager.IllegalSSTableArgumentException.class)
 +    public void getOrCreateInvalidSSTable() throws Exception
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +        SSTableReader sstable = makeSSTable(true);
 +        prm.getOrCreate(sstable);
 +    }
 +
 +    @Test
 +    public void sessionHasData()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +
 +        UUID repairID = registerSession(cfs, true, true);
 +        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 +
 +        Assert.assertFalse(prm.hasDataForSession(repairID));
 +        SSTableReader sstable = makeSSTable(true);
 +        mutateRepaired(sstable, repairID, false);
 +        prm.addSSTable(sstable);
 +        Assert.assertTrue(prm.hasDataForSession(repairID));
 +    }
 +
 +    @Test
 +    public void noEmptyCompactionTask()
 +    {
 +        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 +        SSTableReader sstable = makeSSTable(false);
 +        UUID id = UUID.randomUUID();
 +        mutateRepaired(sstable, id, false);
 +        prm.getOrCreate(sstable);
 +        cfs.truncateBlocking();
 +        
Assert.assertFalse(cfs.getSSTables(SSTableSet.LIVE).iterator().hasNext());
 +        
Assert.assertNull(cfs.getCompactionStrategyManager().getNextBackgroundTask(0));
 +
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to