This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 3ed1b798146e4536c854fd14bb6456ab29f78e82 Merge: 69b36a5 b773bc7 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Aug 7 10:20:27 2019 +0200 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionManager.java | 67 ++++++++--------- .../db/compaction/CompactionStrategyManager.java | 36 ++++------ .../cassandra/db/compaction/CompactionTasks.java | 74 +++++++++++++++++++ test/unit/org/apache/cassandra/Util.java | 10 +-- .../db/compaction/CompactionsBytemanTest.java | 84 +++++++++++++++++++++- .../db/compaction/CompactionsPurgeTest.java | 16 +++-- .../db/compaction/PendingRepairManagerTest.java | 16 +---- 8 files changed, 225 insertions(+), 79 deletions(-) diff --cc CHANGES.txt index 42080d7,dc8baf2..dad8d40 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,372 -1,5 +1,373 @@@ +4.0 + * Align load column in nodetool status output (CASSANDRA-14787) + * CassandraNetworkAuthorizer uses cached roles info (CASSANDRA-15089) + * Introduce optional timeouts for idle client sessions (CASSANDRA-11097) + * Fix AlterTableStatement dropped type validation order (CASSANDRA-15203) + * Update Netty dependencies to latest, clean up SocketFactory (CASSANDRA-15195) + * Native Transport - Apply noSpamLogger to ConnectionLimitHandler (CASSANDRA-15167) + * Reduce heap pressure during compactions (CASSANDRA-14654) + * Support building Cassandra with JDK 11 (CASSANDRA-15108) + * Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710) + * Take sstable references before calculating approximate key count (CASSANDRA-14647) + * Restore snapshotting of system keyspaces on version change (CASSANDRA-14412) + * Fix AbstractBTreePartition locking in java 11 (CASSANDRA-14607) + * SimpleClient should pass connection properties as options (CASSANDRA-15056) + * Set repaired data tracking flag on range reads if enabled (CASSANDRA-15019) + * Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802) + * Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033) + * Add Zstd compressor (CASSANDRA-14482) + * Fix IR prepare anti-compaction race (CASSANDRA-15027) + * Fix SimpleStrategy option validation (CASSANDRA-15007) + * Don't try to cancel 2i compactions when starting anticompaction (CASSANDRA-15024) + * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025) + * SSL Cert Hot Reloading should check for sanity of the new keystore/truststore before loading it (CASSANDRA-14991) + * Avoid leaking threads when failing anticompactions and rate limit anticompactions (CASSANDRA-15002) + * Validate token() arguments early instead of throwing NPE at execution (CASSANDRA-14989) + * Add a new tool to dump audit logs (CASSANDRA-14885) + * Fix generating javadoc with Java11 (CASSANDRA-14988) + * Only cancel conflicting compactions when starting anticompactions and sub range compactions (CASSANDRA-14935) + * Use a stub IndexRegistry for non-daemon use cases (CASSANDRA-14938) + * Don't enable client transports when bootstrap is pending (CASSANDRA-14525) + * Make antiCompactGroup throw exception on error and anticompaction non cancellable + again (CASSANDRA-14936) + * Catch empty/invalid bounds in SelectStatement (CASSANDRA-14849) + * Auto-expand replication_factor for NetworkTopologyStrategy (CASSANDRA-14303) + * Transient Replication: support EACH_QUORUM (CASSANDRA-14727) + * BufferPool: allocating thread for new chunks should acquire directly (CASSANDRA-14832) + * Send correct messaging version in internode messaging handshake's third message (CASSANDRA-14896) + * Make Read and Write Latency columns consistent for proxyhistograms and tablehistograms (CASSANDRA-11939) + * Make protocol checksum type option case insensitive (CASSANDRA-14716) + * Forbid re-adding static columns as regular and vice versa (CASSANDRA-14913) + * Audit log allows system keyspaces to be audited via configuration options (CASSANDRA-14498) + * Lower default chunk_length_in_kb from 64kb to 16kb (CASSANDRA-13241) + * Startup checker should wait for count rather than percentage (CASSANDRA-14297) + * Fix incorrect sorting of replicas in SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862) + * Partitioned outbound internode TCP connections can occur when nodes restart (CASSANDRA-14358) + * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841) + * Avoid running query to self through messaging service (CASSANDRA-14807) + * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373) + * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759) + * ReplicaCollection follow-up (CASSANDRA-14726) + * Transient node receives full data requests (CASSANDRA-14762) + * Enable snapshot artifacts publish (CASSANDRA-12704) + * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770) + * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735) + * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780) + * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763) + * Add a check for receiving digest response from transient node (CASSANDRA-14750) + * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704) + * Remove mentions of transient replication from repair path (CASSANDRA-14698) + * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720) + * Allow transient node to serve as a repair coordinator (CASSANDRA-14693) + * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696) + * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700) + * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145) + * Add checksumming to the native protocol (CASSANDRA-13304) + * Make AuthCache more easily extendable (CASSANDRA-14662) + * Extend RolesCache to include detailed role info (CASSANDRA-14497) + * Add fqltool compare (CASSANDRA-14619) + * Add fqltool replay (CASSANDRA-14618) + * Log keyspace in full query log (CASSANDRA-14656) + * Transient Replication and Cheap Quorums (CASSANDRA-14404) + * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675) + * Add diagnostic events for read repairs (CASSANDRA-14668) + * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671) + * Add sampler for query time and expose with nodetool (CASSANDRA-14436) + * Clean up Message.Request implementations (CASSANDRA-14677) + * Disable old native protocol versions on demand (CASANDRA-14659) + * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664) + * Improve BTree build performance by avoiding data copy (CASSANDRA-9989) + * Make monotonic read / read repair configurable (CASSANDRA-14635) + * Refactor CompactionStrategyManager (CASSANDRA-14621) + * Flush netty client messages immediately by default (CASSANDRA-13651) + * Improve read repair blocking behavior (CASSANDRA-10726) + * Add a virtual table to expose settings (CASSANDRA-14573) + * Fix up chunk cache handling of metrics (CASSANDRA-14628) + * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652) + * Incomplete handling of exceptions when decoding incoming messages (CASSANDRA-14574) + * Add diagnostic events for user audit logging (CASSANDRA-13668) + * Allow retrieving diagnostic events via JMX (CASSANDRA-14435) + * Add base classes for diagnostic events (CASSANDRA-13457) + * Clear view system metadata when dropping keyspace (CASSANDRA-14646) + * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase (CASSANDRA-14637) + * Make all existing virtual tables use LocalPartitioner (CASSANDRA-14640) + * Revert 4.0 GC alg back to CMS (CASANDRA-14636) + * Remove hardcoded java11 jvm args in idea workspace files (CASSANDRA-14627) + * Update netty to 4.1.128 (CASSANDRA-14633) + * Add a virtual table to expose thread pools (CASSANDRA-14523) + * Add a virtual table to expose caches (CASSANDRA-14538, CASSANDRA-14626) + * Fix toDate function for timestamp arguments (CASSANDRA-14502) + * Revert running dtests by default in circleci (CASSANDRA-14614) + * Stream entire SSTables when possible (CASSANDRA-14556) + * Cell reconciliation should not depend on nowInSec (CASSANDRA-14592) + * Add experimental support for Java 11 (CASSANDRA-9608) + * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580) + * Improve logging in MessageInHandler's constructor (CASSANDRA-14576) + * Set broadcast address in internode messaging handshake (CASSANDRA-14579) + * Wait for schema agreement prior to building MVs (CASSANDRA-14571) + * Make all DDL statements idempotent and not dependent on global state (CASSANDRA-13426) + * Bump the hints messaging version to match the current one (CASSANDRA-14536) + * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind errors in CircleCI (CASSANDRA-14546) + * Report why native_transport_port fails to bind (CASSANDRA-14544) + * Optimize internode messaging protocol (CASSANDRA-14485) + * Internode messaging handshake sends wrong messaging version number (CASSANDRA-14540) + * Add a virtual table to expose active client connections (CASSANDRA-14458) + * Clean up and refactor client metrics (CASSANDRA-14524) + * Nodetool import row cache invalidation races with adding sstables to tracker (CASSANDRA-14529) + * Fix assertions in LWTs after TableMetadata was made immutable (CASSANDRA-14356) + * Abort compactions quicker (CASSANDRA-14397) + * Support light-weight transactions in cassandra-stress (CASSANDRA-13529) + * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509) + * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467) + * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457) + * Let nodetool import take a list of directories (CASSANDRA-14442) + * Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488) + * Implement virtual keyspace interface (CASSANDRA-7622) + * nodetool import cleanup and improvements (CASSANDRA-14417) + * Bump jackson version to >= 2.9.5 (CASSANDRA-14427) + * Allow nodetool toppartitions without specifying table (CASSANDRA-14360) + * Audit logging for database activity (CASSANDRA-12151) + * Clean up build artifacts in docs container (CASSANDRA-14432) + * Minor network authz improvements (Cassandra-14413) + * Automatic sstable upgrades (CASSANDRA-14197) + * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431) + * Cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426) + * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335) + * Improve LatencyMetrics performance by reducing write path processing (CASSANDRA-14281) + * Add network authz (CASSANDRA-13985) + * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDRA-14389) + * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381) + * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402) + * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392) + * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398) + * Add ability to load new SSTables from a separate directory (CASSANDRA-6719) + * Eliminate background repair and probablistic read_repair_chance table options + (CASSANDRA-13910) + * Bind to correct local address in 4.0 streaming (CASSANDRA-14362) + * Use standard Amazon naming for datacenter and rack in Ec2Snitch (CASSANDRA-7839) + * Fix junit failure for SSTableReaderTest (CASSANDRA-14387) + * Abstract write path for pluggable storage (CASSANDRA-14118) + * nodetool describecluster should be more informative (CASSANDRA-13853) + * Compaction performance improvements (CASSANDRA-14261) + * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260) + * Add options to nodetool tablestats to sort and limit output (CASSANDRA-13889) + * Rename internals to reflect CQL vocabulary (CASSANDRA-14354) + * Add support for hybrid MIN(), MAX() speculative retry policies + (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352) + * Fix some regressions caused by 14058 (CASSANDRA-14353) + * Abstract repair for pluggable storage (CASSANDRA-14116) + * Add meaningful toString() impls (CASSANDRA-13653) + * Add sstableloader option to accept target keyspace name (CASSANDRA-13884) + * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713) + * Add coordinator write metric per CF (CASSANDRA-14232) + * Correct and clarify SSLFactory.getSslContext method and call sites (CASSANDRA-14314) + * Handle static and partition deletion properly on ThrottledUnfilteredIterator (CASSANDRA-14315) + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322) + * Add ability to specify driver name and version (CASSANDRA-14275) + * Abstract streaming for pluggable storage (CASSANDRA-14115) + * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294) + * Use Murmur3 for validation compactions (CASSANDRA-14002) + * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285) + * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058) + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993) + * Add a few options to nodetool verify (CASSANDRA-14201) + * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183) + * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259) + * Better document in code InetAddressAndPort usage post 7544, incorporate port into UUIDGen node (CASSANDRA-14226) + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132) + * Make it possible to change neverPurgeTombstones during runtime (CASSANDRA-14214) + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174) + * Add nodetool clientlist (CASSANDRA-13665) + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211) + * Non-disruptive seed node list reload (CASSANDRA-14190) + * Nodetool tablehistograms to print statics for all the tables (CASSANDRA-14185) + * Migrate dtests to use pytest and python3 (CASSANDRA-14134) + * Allow storage port to be configurable per node (CASSANDRA-7544) + * Make sub-range selection for non-frozen collections return null instead of empty (CASSANDRA-14182) + * BloomFilter serialization format should not change byte ordering (CASSANDRA-9067) + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152) + * Delete temp test files on exit (CASSANDRA-14153) + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867) + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066) + * Fix cassandra-stress startup failure (CASSANDRA-14106) + * Remove initialDirectories from CFS (CASSANDRA-13928) + * Fix trivial log format error (CASSANDRA-14015) + * Allow sstabledump to do a json object per partition (CASSANDRA-13848) + * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200) + * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081) + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090) + * Support a means of logging all queries as they were invoked (CASSANDRA-13983) + * Presize collections (CASSANDRA-13760) + * Add GroupCommitLogService (CASSANDRA-13530) + * Parallelize initial materialized view build (CASSANDRA-12245) + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965) + * Make LWTs send resultset metadata on every request (CASSANDRA-13992) + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963) + * Introduce leaf-only iterator (CASSANDRA-9988) + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997) + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182) + * Refactoring to specialised functional interfaces (CASSANDRA-13982) + * Speculative retry should allow more friendly params (CASSANDRA-13876) + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944) + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291) + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728) + * Fix some alerts raised by static analysis (CASSANDRA-13799) + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593) + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786) + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941) + * Expose recent histograms in JmxHistograms (CASSANDRA-13642) + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899) + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906) + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925) + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961) + * Correctly close netty channels when a stream session ends (CASSANDRA-13905) + * Update lz4 to 1.4.0 (CASSANDRA-13741) + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862) + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299) + * Use compaction threshold for STCS in L0 (CASSANDRA-13861) + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703) + * Add extra information to SASI timeout exception (CASSANDRA-13677) + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818) + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786) + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846) + * Add keyspace and table name in schema validation exception (CASSANDRA-13845) + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771) + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837) + * Race condition when closing stream sessions (CASSANDRA-13852) + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831) + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696) + * Add stress profile yaml with LWT (CASSANDRA-7960) + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789) + * Simplify mx4j configuration (Cassandra-13578) + * Fix trigger example on 4.0 (CASSANDRA-13796) + * Force minumum timeout value (CASSANDRA-9375) + * Use netty for streaming (CASSANDRA-12229) + * Use netty for internode messaging (CASSANDRA-8457) + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774) + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758) + * Fix pending repair manager index out of bounds check (CASSANDRA-13769) + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576) + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664) + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594) + * Fix race / ref leak in anticompaction (CASSANDRA-13688) + * Expose tasks queue length via JMX (CASSANDRA-12758) + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751) + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615) + * Improve sstablemetadata output (CASSANDRA-11483) + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371) + * Introduce error metrics for repair (CASSANDRA-13387) + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732) + * Update metrics to 3.1.5 (CASSANDRA-13648) + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699) + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725) + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727) + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996) + * Default for start_native_transport now true if not set in config (CASSANDRA-13656) + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583) + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148) + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271) + * Use common nowInSec for validation compactions (CASSANDRA-13671) + * Improve handling of IR prepare failures (CASSANDRA-13672) + * Send IR coordinator messages synchronously (CASSANDRA-13673) + * Flush system.repair table before IR finalize promise (CASSANDRA-13660) + * Fix column filter creation for wildcard queries (CASSANDRA-13650) + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614) + * fix race condition in PendingRepairManager (CASSANDRA-13659) + * Allow noop incremental repair state transitions (CASSANDRA-13658) + * Run repair with down replicas (CASSANDRA-10446) + * Added started & completed repair metrics (CASSANDRA-13598) + * Added started & completed repair metrics (CASSANDRA-13598) + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130) + * Improve calculation of available disk space for compaction (CASSANDRA-13068) + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579) + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570) + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585) + * Fix Randomness of stress values (CASSANDRA-12744) + * Allow selecting Map values and Set elements (CASSANDRA-7396) + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444) + * Update repairTime for keyspaces on completion (CASSANDRA-13539) + * Add configurable upper bound for validation executor threads (CASSANDRA-13521) + * Bring back maxHintTTL propery (CASSANDRA-12982) + * Add testing guidelines (CASSANDRA-13497) + * Add more repair metrics (CASSANDRA-13531) + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650) + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367) + * Log time elapsed for each incremental repair phase (CASSANDRA-13498) + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780) + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262) + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421) + * Improve incremental repair logging (CASSANDRA-13468) + * Start compaction when incremental repair finishes (CASSANDRA-13454) + * Add repair streaming preview (CASSANDRA-13257) + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430) + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145) + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661) + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354) + * Skip building views during base table streams on range movements (CASSANDRA-13065) + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197) + * Remove deprecated repair JMX APIs (CASSANDRA-11530) + * Fix version check to enable streaming keep-alive (CASSANDRA-12929) + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289) + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324) + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360) + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359) + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336) + * Incremental repair not streaming correct sstables (CASSANDRA-13328) + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300) + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132) + * Remove config option index_interval (CASSANDRA-10671) + * Reduce lock contention for collection types and serializers (CASSANDRA-13271) + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283) + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292) + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520) + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226) + * Remove unused method (CASSANDRA-13227) + * Fix minor bugs related to #9143 (CASSANDRA-13217) + * Output warning if user increases RF (CASSANDRA-13079) + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081) + * Add support for + and - operations on dates (CASSANDRA-11936) + * Fix consistency of incrementally repaired data (CASSANDRA-9143) + * Increase commitlog version (CASSANDRA-13161) + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425) + * Refactor ColumnCondition (CASSANDRA-12981) + * Parallelize streaming of different keyspaces (CASSANDRA-4663) + * Improved compactions metrics (CASSANDRA-13015) + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031) + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855) + * Thrift removal (CASSANDRA-11115) + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716) + * Add column definition kind to dropped columns in schema (CASSANDRA-12705) + * Add (automate) Nodetool Documentation (CASSANDRA-12672) + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736) + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681) + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422) + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080) + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084) + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510) + * Allow IN restrictions on column families with collections (CASSANDRA-12654) + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028) + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029) + * Add mutation size and batch metrics (CASSANDRA-12649) + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999) + * Expose time spent waiting in thread pool queue (CASSANDRA-8398) + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969) + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946) + * Add support for arithmetic operators (CASSANDRA-11935) + * Add histogram for delay to deliver hints (CASSANDRA-13234) + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720) + * Trivial format error in StorageProxy (CASSANDRA-13551) + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480) + * Anticompaction can cause noisy log messages (CASSANDRA-13684) + * Switch to client init for sstabledump (CASSANDRA-13683) + * CQLSH: Don't pause when capturing data (CASSANDRA-13743) + * nodetool clearsnapshot requires --all to clear all snapshots (CASSANDRA-13391) + * Correctly count range tombstones in traces and tombstone thresholds (CASSANDRA-8527) + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243) + + 3.11.5 + * Make sure user defined compaction transactions are always closed (CASSANDRA-15123) * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config (CASSANDRA-14305) * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903) * Add flag to disable SASI indexes, and warnings on creation (CASSANDRA-14866) diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index d387701,7086d77..896fa2a --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -817,14 -745,14 +817,15 @@@ public class CompactionManager implemen FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()), splitOutput)); } ++ @SuppressWarnings("resource") // the tasks are executed in parallel on the executor, making sure that they get closed public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput) { // here we compute the task off the compaction executor, so having that present doesn't // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting // for ourselves to finish/acknowledge cancellation before continuing. -- final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput); ++ CompactionTasks tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput); -- if (tasks == null) ++ if (tasks.isEmpty()) return Collections.emptyList(); List<Future<?>> futures = new ArrayList<>(); @@@ -850,42 -778,45 +851,42 @@@ if (nonEmptyTasks > 1) logger.info("Major compaction will not result in a single sstable - repaired and unrepaired data is kept separate and compaction runs per data_file_directory."); -- return futures; } public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<Range<Token>> ranges) { - final Collection<AbstractCompactionTask> tasks = cfStore.runWithCompactionsDisabled(() -> - { - Collection<SSTableReader> sstables = sstablesInBounds(cfStore, ranges); - if (sstables == null || sstables.isEmpty()) - { - logger.debug("No sstables found for the provided token range"); - return null; - } - return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())); - }, (sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges), false, false, false); - - if (tasks == null) - return; - - Runnable runnable = new WrappedRunnable() - { - protected void runMayThrow() - Callable<Collection<AbstractCompactionTask>> taskCreator = () -> { ++ Callable<CompactionTasks> taskCreator = () -> { + Collection<SSTableReader> sstables = sstablesInBounds(cfStore, ranges); + if (sstables == null || sstables.isEmpty()) { - for (AbstractCompactionTask task : tasks) - if (task != null) - task.execute(active); + logger.debug("No sstables found for the provided token range"); - return null; ++ return CompactionTasks.empty(); } + return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())); }; - if (executor.isShutdown()) - final Collection<AbstractCompactionTask> tasks = cfStore.runWithCompactionsDisabled(taskCreator, false, false); - - if (tasks == null) - return; - - Runnable runnable = new WrappedRunnable() ++ try (CompactionTasks tasks = cfStore.runWithCompactionsDisabled(taskCreator, ++ (sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges), ++ false, ++ false, ++ false)) { - logger.info("Compaction executor has shut down, not submitting task"); - return; - protected void runMayThrow() throws Exception ++ if (tasks.isEmpty()) ++ return; ++ ++ Runnable runnable = new WrappedRunnable() + { - try ++ protected void runMayThrow() + { + for (AbstractCompactionTask task : tasks) + if (task != null) - task.execute(metrics); ++ task.execute(active); + } - finally - { - FBUtilities.closeAll(tasks.stream().map(task -> task.transaction).collect(Collectors.toList())); - } - } - }; ++ }; + - FBUtilities.waitOnFuture(executor.submitIfRunning(runnable, "force compaction for token range")); ++ FBUtilities.waitOnFuture(executor.submitIfRunning(runnable, "force compaction for token range")); + } - FBUtilities.waitOnFuture(executor.submit(runnable)); } private static Collection<SSTableReader> sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> tokenRangeCollection) @@@ -1015,12 -943,19 +1016,14 @@@ } else { -- List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore); - for (AbstractCompactionTask task : tasks) - try ++ try (CompactionTasks tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore)) { - if (task != null) - task.execute(active); + for (AbstractCompactionTask task : tasks) + { + if (task != null) - task.execute(metrics); ++ task.execute(active); + } } - finally - { - FBUtilities.closeAll(tasks.stream().map(task -> task.transaction).collect(Collectors.toList())); - } } } }; diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index b978641,86170a1..fd4dbeb --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -18,18 -18,8 +18,17 @@@ package org.apache.cassandra.db.compaction; -import java.util.*; -import java.util.concurrent.Callable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; - import java.util.concurrent.Callable; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import java.util.stream.Collectors; @@@ -954,36 -805,45 +953,29 @@@ public class CompactionStrategyManager { readLock.unlock(); } - } -- public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput) ++ public CompactionTasks getMaximalTasks(final int gcBefore, final boolean splitOutput) { maybeReloadDiskBoundaries(); // runWithCompactionsDisabled cancels active compactions and disables them, then we are able // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the // sstables are marked the compactions are re-enabled -- return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>() -- { -- @Override -- public Collection<AbstractCompactionTask> call() ++ return cfs.runWithCompactionsDisabled(() -> { ++ List<AbstractCompactionTask> tasks = new ArrayList<>(); ++ readLock.lock(); ++ try { -- List<AbstractCompactionTask> tasks = new ArrayList<>(); -- readLock.lock(); -- try -- { - for (AbstractStrategyHolder holder : holders) - for (AbstractCompactionStrategy strategy : repaired) -- { - tasks.addAll(holder.getMaximalTasks(gcBefore, splitOutput)); - Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput); - if (task != null) - tasks.addAll(task); - } - for (AbstractCompactionStrategy strategy : unrepaired) - { - Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput); - if (task != null) - tasks.addAll(task); -- } -- } -- finally ++ for (AbstractStrategyHolder holder : holders) { -- readLock.unlock(); ++ tasks.addAll(holder.getMaximalTasks(gcBefore, splitOutput)); } -- if (tasks.isEmpty()) -- return null; -- return tasks; } ++ finally ++ { ++ readLock.unlock(); ++ } ++ return CompactionTasks.create(tasks); }, false, false); } @@@ -996,19 -856,37 +988,19 @@@ * @param gcBefore gc grace period, throw away tombstones older than this * @return a list of compaction tasks corresponding to the sstables requested */ -- public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore) - { - return getUserDefinedTasks(sstables, gcBefore, false); - } - - public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore, boolean validateForCompaction) ++ public CompactionTasks getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore) { maybeReloadDiskBoundaries(); List<AbstractCompactionTask> ret = new ArrayList<>(); readLock.lock(); try { - if (validateForCompaction) - validateForCompaction(sstables); - - Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream() - .filter(s -> !s.isMarkedSuspect() && s.isRepaired()) - .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s))); - - Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream() - .filter(s -> !s.isMarkedSuspect() && !s.isRepaired()) - .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s))); - - - for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet()) - ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore)); - - for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet()) - ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore)); - - return ret; + List<GroupedSSTableContainer> groupedSSTables = groupSSTables(sstables); + for (int i = 0; i < holders.size(); i++) + { + ret.addAll(holders.get(i).getUserDefinedTasks(groupedSSTables.get(i), gcBefore)); + } - return ret; ++ return CompactionTasks.create(ret); } finally { diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTasks.java index 0000000,0000000..af0dbd0 new file mode 100644 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTasks.java @@@ -1,0 -1,0 +1,74 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.db.compaction; ++ ++import java.util.AbstractCollection; ++import java.util.Collection; ++import java.util.Collections; ++import java.util.Iterator; ++import java.util.stream.Collectors; ++ ++import org.apache.cassandra.utils.FBUtilities; ++ ++public class CompactionTasks extends AbstractCollection<AbstractCompactionTask> implements AutoCloseable ++{ ++ @SuppressWarnings("resource") ++ private static final CompactionTasks EMPTY = new CompactionTasks(Collections.emptyList()); ++ ++ private final Collection<AbstractCompactionTask> tasks; ++ ++ private CompactionTasks(Collection<AbstractCompactionTask> tasks) ++ { ++ this.tasks = tasks; ++ } ++ ++ public static CompactionTasks create(Collection<AbstractCompactionTask> tasks) ++ { ++ if (tasks == null || tasks.isEmpty()) ++ return EMPTY; ++ return new CompactionTasks(tasks); ++ } ++ ++ public static CompactionTasks empty() ++ { ++ return EMPTY; ++ } ++ ++ public Iterator<AbstractCompactionTask> iterator() ++ { ++ return tasks.iterator(); ++ } ++ ++ public int size() ++ { ++ return tasks.size(); ++ } ++ ++ public void close() ++ { ++ try ++ { ++ FBUtilities.closeAll(tasks.stream().map(task -> task.transaction).collect(Collectors.toList())); ++ } ++ catch (Exception e) ++ { ++ throw new RuntimeException(e); ++ } ++ } ++} diff --cc test/unit/org/apache/cassandra/Util.java index df45f3c,006cd76..3dcaff7 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@@ -41,14 -39,8 +41,14 @@@ import org.apache.commons.lang3.StringU import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.compaction.ActiveCompactionsTracker; ++import org.apache.cassandra.db.compaction.CompactionTasks; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnIdentifier; @@@ -247,9 -241,9 +247,11 @@@ public class Uti public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds()); -- List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore); -- for (AbstractCompactionTask task : tasks) - task.execute(ActiveCompactionsTracker.NOOP); - task.execute(null); ++ try (CompactionTasks tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore)) ++ { ++ for (AbstractCompactionTask task : tasks) ++ task.execute(ActiveCompactionsTracker.NOOP); ++ } } public static void expectEOF(Callable<?> callable) diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java index 2519389,d5f2800..95069f1 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java @@@ -18,16 -18,27 +18,24 @@@ package org.apache.cassandra.db.compaction; -import java.io.IOException; +import java.util.concurrent.TimeUnit; + import java.util.Collection; + import java.util.Collections; -import java.util.HashSet; -import java.util.Set; + import java.util.function.Consumer; + import java.util.stream.Collectors; import org.junit.Test; import org.junit.runner.RunWith; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.dht.Range; + import org.apache.cassandra.dht.Token; + import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.utils.FBUtilities; import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import static org.junit.Assert.assertEquals; @@@ -124,21 -69,87 +134,93 @@@ public class CompactionsBytemanTest ext assertEquals(0, CompactionManager.instance.compactingCF.count(cfs)); } + private void createPossiblyExpiredSSTable(final ColumnFamilyStore cfs, final boolean expired) throws Throwable + { + if (expired) + { + execute("INSERT INTO %s (id, val) values (1, 'expired') USING TTL 1"); + Thread.sleep(TimeUnit.SECONDS.toMillis((long)1.5)); + } + else + { + execute("INSERT INTO %s (id, val) values (2, 'immortal')"); + } + cfs.forceBlockingFlush(); + } + + private void createLowGCGraceTable(){ + createTable("CREATE TABLE %s (id int PRIMARY KEY, val text) with compaction = {'class':'SizeTieredCompactionStrategy', 'enabled': 'false'} AND gc_grace_seconds=0"); + } - } ++ + @Test + @BMRule(name = "Stop all compactions", + targetClass = "CompactionTask", + targetMethod = "runMayThrow", + targetLocation = "AT INVOKE getCompactionAwareWriter", + action = "$ci.stop()") + public void testStopUserDefinedCompactionRepaired() throws Throwable + { + testStopCompactionRepaired((cfs) -> { + Collection<Descriptor> files = cfs.getLiveSSTables().stream().map(s -> s.descriptor).collect(Collectors.toList()); + FBUtilities.waitOnFuture(CompactionManager.instance.submitUserDefined(cfs, files, CompactionManager.NO_GC)); + }); + } + + @Test + @BMRule(name = "Stop all compactions", + targetClass = "CompactionTask", + targetMethod = "runMayThrow", + targetLocation = "AT INVOKE getCompactionAwareWriter", + action = "$ci.stop()") + public void testStopSubRangeCompactionRepaired() throws Throwable + { + testStopCompactionRepaired((cfs) -> { + Collection<Range<Token>> ranges = Collections.singleton(new Range<>(cfs.getPartitioner().getMinimumToken(), + cfs.getPartitioner().getMaximumToken())); + CompactionManager.instance.forceCompactionForTokenRange(cfs, ranges); + }); + } + + public void testStopCompactionRepaired(Consumer<ColumnFamilyStore> compactionRunner) throws Throwable + { + String table = createTable("CREATE TABLE %s (k INT, c INT, v INT, PRIMARY KEY (k, c))"); + ColumnFamilyStore cfs = Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(table); + cfs.disableAutoCompaction(); + for (int i = 0; i < 5; i++) + { + for (int j = 0; j < 10; j++) + { + execute("insert into %s (k, c, v) values (?, ?, ?)", i, j, i*j); + } + cfs.forceBlockingFlush(); + } - setRepaired(cfs, cfs.getLiveSSTables()); ++ cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), System.currentTimeMillis(), null, false); + for (int i = 0; i < 5; i++) + { + for (int j = 0; j < 10; j++) + { + execute("insert into %s (k, c, v) values (?, ?, ?)", i, j, i*j); + } + cfs.forceBlockingFlush(); + } + + assertTrue(cfs.getTracker().getCompacting().isEmpty()); - assertTrue(CompactionMetrics.getCompactions().stream().noneMatch(h -> h.getCompactionInfo().getCFMetaData().equals(cfs.metadata))); ++ assertTrue(CompactionManager.instance.active.getCompactions().stream().noneMatch(h -> h.getCompactionInfo().getTableMetadata().equals(cfs.metadata))); + + try + { + compactionRunner.accept(cfs); + fail("compaction should fail"); + } + catch (RuntimeException t) + { + if (!(t.getCause().getCause() instanceof CompactionInterruptedException)) + throw t; + //expected + } + + assertTrue(cfs.getTracker().getCompacting().isEmpty()); - assertTrue(CompactionMetrics.getCompactions().stream().noneMatch(h -> h.getCompactionInfo().getCFMetaData().equals(cfs.metadata))); - - } ++ assertTrue(CompactionManager.instance.active.getCompactions().stream().noneMatch(h -> h.getCompactionInfo().getTableMetadata().equals(cfs.metadata))); + - private void setRepaired(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables) throws IOException - { - Set<SSTableReader> changed = new HashSet<>(); - for (SSTableReader sstable: sstables) - { - sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, System.currentTimeMillis()); - sstable.reloadSSTableMetadata(); - changed.add(sstable); - } - cfs.getTracker().notifySSTableRepairedStatusChanged(changed); + } + } diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index dcd5270,f5b1641..a0d52aa --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@@ -19,9 -19,9 +19,9 @@@ package org.apache.cassandra.db.compaction; import java.util.Collection; --import java.util.List; import java.util.concurrent.ExecutionException; ++import com.google.common.collect.Iterables; import org.junit.BeforeClass; import org.junit.Test; @@@ -303,9 -298,9 +303,10 @@@ public class CompactionsPurgeTes .build().applyUnsafe(); cfs.forceBlockingFlush(); -- List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE); -- assertEquals(1, tasks.size()); - tasks.get(0).execute(ActiveCompactionsTracker.NOOP); - tasks.get(0).execute(null); ++ try (CompactionTasks tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE)) ++ { ++ Iterables.getOnlyElement(tasks).execute(ActiveCompactionsTracker.NOOP); ++ } // verify that minor compaction does GC when key is provably not // present in a non-compacted sstable @@@ -354,9 -349,9 +355,10 @@@ cfs.forceBlockingFlush(); // compact the sstables with the c1/c2 data and the c1 tombstone -- List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE); -- assertEquals(1, tasks.size()); - tasks.get(0).execute(ActiveCompactionsTracker.NOOP); - tasks.get(0).execute(null); ++ try (CompactionTasks tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE)) ++ { ++ Iterables.getOnlyElement(tasks).execute(ActiveCompactionsTracker.NOOP); ++ } // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone. diff --cc test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java index 4e645fd,0000000..9f4cf8d mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java @@@ -1,318 -1,0 +1,308 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Collection; +import java.util.Collections; - import java.util.List; +import java.util.UUID; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.repair.consistent.LocalSessionAccessor; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +public class PendingRepairManagerTest extends AbstractPendingRepairTest +{ + /** + * If a local session is ongoing, it should not be cleaned up + */ + @Test + public void needsCleanupInProgress() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + + Assert.assertFalse(prm.canCleanup(repairID)); + } + + /** + * If a local session is finalized, it should be cleaned up + */ + @Test + public void needsCleanupFinalized() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertTrue(prm.canCleanup(repairID)); + } + + /** + * If a local session has failed, it should be cleaned up + */ + @Test + public void needsCleanupFailed() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.failUnsafe(repairID); + + Assert.assertTrue(prm.canCleanup(repairID)); + } + + @Test + public void needsCleanupNoSession() + { + UUID fakeID = UUIDGen.getTimeUUID(); + PendingRepairManager prm = new PendingRepairManager(cfs, null, false); + Assert.assertTrue(prm.canCleanup(fakeID)); + } + + @Test + public void estimateRemainingTasksInProgress() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + + Assert.assertEquals(0, prm.getEstimatedRemainingTasks()); + Assert.assertEquals(0, prm.getNumPendingRepairFinishedTasks()); + } + + @Test + public void estimateRemainingFinishedRepairTasks() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertEquals(0, prm.getEstimatedRemainingTasks()); + Assert.assertEquals(1, prm.getNumPendingRepairFinishedTasks()); + } + + @Test + public void getNextBackgroundTask() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + + repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertEquals(2, prm.getSessions().size()); + Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds())); + AbstractCompactionTask compactionTask = prm.getNextRepairFinishedTask(); + try + { + Assert.assertNotNull(compactionTask); + Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); + PendingRepairManager.RepairFinishedCompactionTask cleanupTask = (PendingRepairManager.RepairFinishedCompactionTask) compactionTask; + Assert.assertEquals(repairID, cleanupTask.getSessionID()); + } + finally + { + compactionTask.transaction.abort(); + } + } + + @Test + public void getNextBackgroundTaskNoSessions() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds())); + } + + /** + * If all sessions should be cleaned up, getNextBackgroundTask should return null + */ + @Test + public void getNextBackgroundTaskAllCleanup() throws Exception + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds())); + + } + + @Test + public void maximalTaskNeedsCleanup() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Collection<AbstractCompactionTask> tasks = prm.getMaximalTasks(FBUtilities.nowInSeconds(), false); + try + { + Assert.assertEquals(1, tasks.size()); + } + finally + { + tasks.stream().forEach(t -> t.transaction.abort()); + } + } + + @Test + public void userDefinedTaskTest() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + UUID repairId = registerSession(cfs, true, true); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairId, false); + prm.addSSTable(sstable); - List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Collections.singleton(sstable), 100); - try ++ ++ try (CompactionTasks tasks = csm.getUserDefinedTasks(Collections.singleton(sstable), 100)) + { + Assert.assertEquals(1, tasks.size()); + } - finally - { - tasks.stream().forEach(t -> t.transaction.abort()); - } + } + + @Test + public void mixedPendingSessionsTest() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + UUID repairId = registerSession(cfs, true, true); + UUID repairId2 = registerSession(cfs, true, true); + SSTableReader sstable = makeSSTable(true); + SSTableReader sstable2 = makeSSTable(true); + + mutateRepaired(sstable, repairId, false); + mutateRepaired(sstable2, repairId2, false); + prm.addSSTable(sstable); + prm.addSSTable(sstable2); - List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100); - try ++ try (CompactionTasks tasks = csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100)) + { + Assert.assertEquals(2, tasks.size()); + } - finally - { - tasks.stream().forEach(t -> t.transaction.abort()); - } + } + + /** + * Tests that a IllegalSSTableArgumentException is thrown if we try to get + * scanners for an sstable that isn't pending repair + */ + @Test(expected = PendingRepairManager.IllegalSSTableArgumentException.class) + public void getScannersInvalidSSTable() throws Exception + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + SSTableReader sstable = makeSSTable(true); + prm.getScanners(Collections.singleton(sstable), Collections.singleton(RANGE1)); + } + + /** + * Tests that a IllegalSSTableArgumentException is thrown if we try to get + * scanners for an sstable that isn't pending repair + */ + @Test(expected = PendingRepairManager.IllegalSSTableArgumentException.class) + public void getOrCreateInvalidSSTable() throws Exception + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + SSTableReader sstable = makeSSTable(true); + prm.getOrCreate(sstable); + } + + @Test + public void sessionHasData() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + Assert.assertFalse(prm.hasDataForSession(repairID)); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID, false); + prm.addSSTable(sstable); + Assert.assertTrue(prm.hasDataForSession(repairID)); + } + + @Test + public void noEmptyCompactionTask() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + SSTableReader sstable = makeSSTable(false); + UUID id = UUID.randomUUID(); + mutateRepaired(sstable, id, false); + prm.getOrCreate(sstable); + cfs.truncateBlocking(); + Assert.assertFalse(cfs.getSSTables(SSTableSet.LIVE).iterator().hasNext()); + Assert.assertNull(cfs.getCompactionStrategyManager().getNextBackgroundTask(0)); + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org