This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit f9b46aeb3123e3d387837aa8525126b0a450851b Merge: f6c1dea 4b547f1 Author: Marcus Eriksson <[email protected]> AuthorDate: Thu Oct 31 14:34:06 2019 +0100 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 5 ++ .../org/apache/cassandra/db/ColumnFamilyStore.java | 43 +++++++++++-- .../cassandra/db/compaction/CompactionInfo.java | 8 ++- .../db/compaction/CompactionIterator.java | 5 ++ .../cassandra/db/compaction/CompactionManager.java | 26 ++++++++ .../cassandra/db/compaction/CompactionTask.java | 10 +-- .../apache/cassandra/db/compaction/Scrubber.java | 5 ++ .../apache/cassandra/db/compaction/Verifier.java | 5 ++ .../apache/cassandra/db/view/ViewBuilderTask.java | 5 ++ .../cassandra/index/SecondaryIndexBuilder.java | 5 ++ .../cassandra/io/sstable/IndexSummaryManager.java | 2 + .../io/sstable/IndexSummaryRedistribution.java | 6 ++ .../db/compaction/CancelCompactionsTest.java | 72 +++++++++++++++++++++- .../db/repair/PendingAntiCompactionTest.java | 15 +++++ .../io/sstable/IndexSummaryManagerTest.java | 46 +++++++++++++- 16 files changed, 246 insertions(+), 13 deletions(-) diff --cc CHANGES.txt index f7bb517,4240841..201ab60 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -2,400 -5,9 +2,401 @@@ Merged from 2.2: * In-JVM DTest: Set correct internode message version for upgrade test (CASSANDRA-15371) +4.0-alpha2 + * Fix SASI non-literal string comparisons (range operators) (CASSANDRA-15169) + * Upgrade Guava to 27, and to java-driver 3.6.0 (from 3.4.0-SNAPSHOT) (CASSANDRA-14655) + * Extract an AbstractCompactionController to allow for custom implementations (CASSANDRA-15286) + * Move chronicle-core version from snapshot to stable, and include carrotsearch in generated pom.xml (CASSANDRA-15321) + * Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly (CASSANDRA-15163) + * Add `allocate_tokens_for_local_replication_factor` option for token allocation (CASSANDRA-15260) + * Add Alibaba Cloud Platform snitch (CASSANDRA-15092) +Merged from 3.0: ++ * Make sure index summary redistribution does not start when compactions are paused (CASSANDRA-15265) + * Add ability to cap max negotiable protocol version (CASSANDRA-15193) + * Gossip tokens on startup if available (CASSANDRA-15335) + * Fix resource leak in CompressedSequentialWriter (CASSANDRA-15340) + * Fix bad merge that reverted CASSANDRA-14993 (CASSANDRA-15289) + * Add support for network topology and query tracing for inJVM dtest (CASSANDRA-15319) + + +4.0-alpha1 + * Inaccurate exception message with nodetool snapshot (CASSANDRA-15287) + * Fix InternodeOutboundMetrics overloaded bytes/count mixup (CASSANDRA-15186) + * Enhance & reenable RepairTest with compression=off and compression=on (CASSANDRA-15272) + * Improve readability of Table metrics Virtual tables units (CASSANDRA-15194) + * Fix error with non-existent table for nodetool tablehistograms (CASSANDRA-14410) + * Avoid result truncation in decimal operations (CASSANDRA-15232) + * Catch non-IOException in FileUtils.close to make sure that all resources are closed (CASSANDRA-15225) + * Align load column in nodetool status output (CASSANDRA-14787) + * CassandraNetworkAuthorizer uses cached roles info (CASSANDRA-15089) + * Introduce optional timeouts for idle client sessions (CASSANDRA-11097) + * Fix AlterTableStatement dropped type validation order (CASSANDRA-15203) + * Update Netty dependencies to latest, clean up SocketFactory (CASSANDRA-15195) + * Native Transport - Apply noSpamLogger to ConnectionLimitHandler (CASSANDRA-15167) + * Reduce heap pressure during compactions (CASSANDRA-14654) + * Support building Cassandra with JDK 11 (CASSANDRA-15108) + * Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710) + * Take sstable references before calculating approximate key count (CASSANDRA-14647) + * Restore snapshotting of system keyspaces on version change (CASSANDRA-14412) + * Fix AbstractBTreePartition locking in java 11 (CASSANDRA-14607) + * SimpleClient should pass connection properties as options (CASSANDRA-15056) + * Set repaired data tracking flag on range reads if enabled (CASSANDRA-15019) + * Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802) + * Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033) + * Add Zstd compressor (CASSANDRA-14482) + * Fix IR prepare anti-compaction race (CASSANDRA-15027) + * Fix SimpleStrategy option validation (CASSANDRA-15007) + * Don't try to cancel 2i compactions when starting anticompaction (CASSANDRA-15024) + * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025) + * SSL Cert Hot Reloading should check for sanity of the new keystore/truststore before loading it (CASSANDRA-14991) + * Avoid leaking threads when failing anticompactions and rate limit anticompactions (CASSANDRA-15002) + * Validate token() arguments early instead of throwing NPE at execution (CASSANDRA-14989) + * Add a new tool to dump audit logs (CASSANDRA-14885) + * Fix generating javadoc with Java11 (CASSANDRA-14988) + * Only cancel conflicting compactions when starting anticompactions and sub range compactions (CASSANDRA-14935) + * Use a stub IndexRegistry for non-daemon use cases (CASSANDRA-14938) + * Don't enable client transports when bootstrap is pending (CASSANDRA-14525) + * Make antiCompactGroup throw exception on error and anticompaction non cancellable + again (CASSANDRA-14936) + * Catch empty/invalid bounds in SelectStatement (CASSANDRA-14849) + * Auto-expand replication_factor for NetworkTopologyStrategy (CASSANDRA-14303) + * Transient Replication: support EACH_QUORUM (CASSANDRA-14727) + * BufferPool: allocating thread for new chunks should acquire directly (CASSANDRA-14832) + * Send correct messaging version in internode messaging handshake's third message (CASSANDRA-14896) + * Make Read and Write Latency columns consistent for proxyhistograms and tablehistograms (CASSANDRA-11939) + * Make protocol checksum type option case insensitive (CASSANDRA-14716) + * Forbid re-adding static columns as regular and vice versa (CASSANDRA-14913) + * Audit log allows system keyspaces to be audited via configuration options (CASSANDRA-14498) + * Lower default chunk_length_in_kb from 64kb to 16kb (CASSANDRA-13241) + * Startup checker should wait for count rather than percentage (CASSANDRA-14297) + * Fix incorrect sorting of replicas in SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862) + * Partitioned outbound internode TCP connections can occur when nodes restart (CASSANDRA-14358) + * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841) + * Avoid running query to self through messaging service (CASSANDRA-14807) + * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373) + * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759) + * ReplicaCollection follow-up (CASSANDRA-14726) + * Transient node receives full data requests (CASSANDRA-14762) + * Enable snapshot artifacts publish (CASSANDRA-12704) + * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770) + * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735) + * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780) + * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763) + * Add a check for receiving digest response from transient node (CASSANDRA-14750) + * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704) + * Remove mentions of transient replication from repair path (CASSANDRA-14698) + * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720) + * Allow transient node to serve as a repair coordinator (CASSANDRA-14693) + * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696) + * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700) + * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145) + * Add checksumming to the native protocol (CASSANDRA-13304) + * Make AuthCache more easily extendable (CASSANDRA-14662) + * Extend RolesCache to include detailed role info (CASSANDRA-14497) + * Add fqltool compare (CASSANDRA-14619) + * Add fqltool replay (CASSANDRA-14618) + * Log keyspace in full query log (CASSANDRA-14656) + * Transient Replication and Cheap Quorums (CASSANDRA-14404) + * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675) + * Add diagnostic events for read repairs (CASSANDRA-14668) + * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671) + * Add sampler for query time and expose with nodetool (CASSANDRA-14436) + * Clean up Message.Request implementations (CASSANDRA-14677) + * Disable old native protocol versions on demand (CASANDRA-14659) + * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664) + * Improve BTree build performance by avoiding data copy (CASSANDRA-9989) + * Make monotonic read / read repair configurable (CASSANDRA-14635) + * Refactor CompactionStrategyManager (CASSANDRA-14621) + * Flush netty client messages immediately by default (CASSANDRA-13651) + * Improve read repair blocking behavior (CASSANDRA-10726) + * Add a virtual table to expose settings (CASSANDRA-14573) + * Fix up chunk cache handling of metrics (CASSANDRA-14628) + * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652) + * Incomplete handling of exceptions when decoding incoming messages (CASSANDRA-14574) + * Add diagnostic events for user audit logging (CASSANDRA-13668) + * Allow retrieving diagnostic events via JMX (CASSANDRA-14435) + * Add base classes for diagnostic events (CASSANDRA-13457) + * Clear view system metadata when dropping keyspace (CASSANDRA-14646) + * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase (CASSANDRA-14637) + * Make all existing virtual tables use LocalPartitioner (CASSANDRA-14640) + * Revert 4.0 GC alg back to CMS (CASANDRA-14636) + * Remove hardcoded java11 jvm args in idea workspace files (CASSANDRA-14627) + * Update netty to 4.1.128 (CASSANDRA-14633) + * Add a virtual table to expose thread pools (CASSANDRA-14523) + * Add a virtual table to expose caches (CASSANDRA-14538, CASSANDRA-14626) + * Fix toDate function for timestamp arguments (CASSANDRA-14502) + * Revert running dtests by default in circleci (CASSANDRA-14614) + * Stream entire SSTables when possible (CASSANDRA-14556) + * Cell reconciliation should not depend on nowInSec (CASSANDRA-14592) + * Add experimental support for Java 11 (CASSANDRA-9608) + * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580) + * Improve logging in MessageInHandler's constructor (CASSANDRA-14576) + * Set broadcast address in internode messaging handshake (CASSANDRA-14579) + * Wait for schema agreement prior to building MVs (CASSANDRA-14571) + * Make all DDL statements idempotent and not dependent on global state (CASSANDRA-13426) + * Bump the hints messaging version to match the current one (CASSANDRA-14536) + * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind errors in CircleCI (CASSANDRA-14546) + * Report why native_transport_port fails to bind (CASSANDRA-14544) + * Optimize internode messaging protocol (CASSANDRA-14485) + * Internode messaging handshake sends wrong messaging version number (CASSANDRA-14540) + * Add a virtual table to expose active client connections (CASSANDRA-14458) + * Clean up and refactor client metrics (CASSANDRA-14524) + * Nodetool import row cache invalidation races with adding sstables to tracker (CASSANDRA-14529) + * Fix assertions in LWTs after TableMetadata was made immutable (CASSANDRA-14356) + * Abort compactions quicker (CASSANDRA-14397) + * Support light-weight transactions in cassandra-stress (CASSANDRA-13529) + * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509) + * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467) + * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457) + * Let nodetool import take a list of directories (CASSANDRA-14442) + * Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488) + * Implement virtual keyspace interface (CASSANDRA-7622) + * nodetool import cleanup and improvements (CASSANDRA-14417) + * Bump jackson version to >= 2.9.5 (CASSANDRA-14427) + * Allow nodetool toppartitions without specifying table (CASSANDRA-14360) + * Audit logging for database activity (CASSANDRA-12151) + * Clean up build artifacts in docs container (CASSANDRA-14432) + * Minor network authz improvements (Cassandra-14413) + * Automatic sstable upgrades (CASSANDRA-14197) + * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431) + * Cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426) + * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335) + * Improve LatencyMetrics performance by reducing write path processing (CASSANDRA-14281) + * Add network authz (CASSANDRA-13985) + * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDRA-14389) + * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381) + * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402) + * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392) + * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398) + * Add ability to load new SSTables from a separate directory (CASSANDRA-6719) + * Eliminate background repair and probablistic read_repair_chance table options + (CASSANDRA-13910) + * Bind to correct local address in 4.0 streaming (CASSANDRA-14362) + * Use standard Amazon naming for datacenter and rack in Ec2Snitch (CASSANDRA-7839) + * Fix junit failure for SSTableReaderTest (CASSANDRA-14387) + * Abstract write path for pluggable storage (CASSANDRA-14118) + * nodetool describecluster should be more informative (CASSANDRA-13853) + * Compaction performance improvements (CASSANDRA-14261) + * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260) + * Add options to nodetool tablestats to sort and limit output (CASSANDRA-13889) + * Rename internals to reflect CQL vocabulary (CASSANDRA-14354) + * Add support for hybrid MIN(), MAX() speculative retry policies + (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352) + * Fix some regressions caused by 14058 (CASSANDRA-14353) + * Abstract repair for pluggable storage (CASSANDRA-14116) + * Add meaningful toString() impls (CASSANDRA-13653) + * Add sstableloader option to accept target keyspace name (CASSANDRA-13884) + * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713) + * Add coordinator write metric per CF (CASSANDRA-14232) + * Correct and clarify SSLFactory.getSslContext method and call sites (CASSANDRA-14314) + * Handle static and partition deletion properly on ThrottledUnfilteredIterator (CASSANDRA-14315) + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322) + * Add ability to specify driver name and version (CASSANDRA-14275) + * Abstract streaming for pluggable storage (CASSANDRA-14115) + * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294) + * Use Murmur3 for validation compactions (CASSANDRA-14002) + * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285) + * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058) + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993) + * Add a few options to nodetool verify (CASSANDRA-14201) + * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183) + * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259) + * Better document in code InetAddressAndPort usage post 7544, incorporate port into UUIDGen node (CASSANDRA-14226) + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132) + * Make it possible to change neverPurgeTombstones during runtime (CASSANDRA-14214) + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174) + * Add nodetool clientlist (CASSANDRA-13665) + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211) + * Non-disruptive seed node list reload (CASSANDRA-14190) + * Nodetool tablehistograms to print statics for all the tables (CASSANDRA-14185) + * Migrate dtests to use pytest and python3 (CASSANDRA-14134) + * Allow storage port to be configurable per node (CASSANDRA-7544) + * Make sub-range selection for non-frozen collections return null instead of empty (CASSANDRA-14182) + * BloomFilter serialization format should not change byte ordering (CASSANDRA-9067) + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152) + * Delete temp test files on exit (CASSANDRA-14153) + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867) + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066) + * Fix cassandra-stress startup failure (CASSANDRA-14106) + * Remove initialDirectories from CFS (CASSANDRA-13928) + * Fix trivial log format error (CASSANDRA-14015) + * Allow sstabledump to do a json object per partition (CASSANDRA-13848) + * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200) + * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081) + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090) + * Support a means of logging all queries as they were invoked (CASSANDRA-13983) + * Presize collections (CASSANDRA-13760) + * Add GroupCommitLogService (CASSANDRA-13530) + * Parallelize initial materialized view build (CASSANDRA-12245) + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965) + * Make LWTs send resultset metadata on every request (CASSANDRA-13992) + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963) + * Introduce leaf-only iterator (CASSANDRA-9988) + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997) + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182) + * Refactoring to specialised functional interfaces (CASSANDRA-13982) + * Speculative retry should allow more friendly params (CASSANDRA-13876) + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944) + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291) + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728) + * Fix some alerts raised by static analysis (CASSANDRA-13799) + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593) + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786) + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941) + * Expose recent histograms in JmxHistograms (CASSANDRA-13642) + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899) + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906) + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925) + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961) + * Correctly close netty channels when a stream session ends (CASSANDRA-13905) + * Update lz4 to 1.4.0 (CASSANDRA-13741) + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862) + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299) + * Use compaction threshold for STCS in L0 (CASSANDRA-13861) + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703) + * Add extra information to SASI timeout exception (CASSANDRA-13677) + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818) + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786) + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846) + * Add keyspace and table name in schema validation exception (CASSANDRA-13845) + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771) + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837) + * Race condition when closing stream sessions (CASSANDRA-13852) + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831) + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696) + * Add stress profile yaml with LWT (CASSANDRA-7960) + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789) + * Simplify mx4j configuration (Cassandra-13578) + * Fix trigger example on 4.0 (CASSANDRA-13796) + * Force minumum timeout value (CASSANDRA-9375) + * Use netty for streaming (CASSANDRA-12229) + * Use netty for internode messaging (CASSANDRA-8457) + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774) + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758) + * Fix pending repair manager index out of bounds check (CASSANDRA-13769) + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576) + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664) + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594) + * Fix race / ref leak in anticompaction (CASSANDRA-13688) + * Expose tasks queue length via JMX (CASSANDRA-12758) + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751) + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615) + * Improve sstablemetadata output (CASSANDRA-11483) + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371) + * Introduce error metrics for repair (CASSANDRA-13387) + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732) + * Update metrics to 3.1.5 (CASSANDRA-13648) + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699) + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725) + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727) + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996) + * Default for start_native_transport now true if not set in config (CASSANDRA-13656) + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583) + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148) + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271) + * Use common nowInSec for validation compactions (CASSANDRA-13671) + * Improve handling of IR prepare failures (CASSANDRA-13672) + * Send IR coordinator messages synchronously (CASSANDRA-13673) + * Flush system.repair table before IR finalize promise (CASSANDRA-13660) + * Fix column filter creation for wildcard queries (CASSANDRA-13650) + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614) + * fix race condition in PendingRepairManager (CASSANDRA-13659) + * Allow noop incremental repair state transitions (CASSANDRA-13658) + * Run repair with down replicas (CASSANDRA-10446) + * Added started & completed repair metrics (CASSANDRA-13598) + * Added started & completed repair metrics (CASSANDRA-13598) + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130) + * Improve calculation of available disk space for compaction (CASSANDRA-13068) + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579) + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570) + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585) + * Fix Randomness of stress values (CASSANDRA-12744) + * Allow selecting Map values and Set elements (CASSANDRA-7396) + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444) + * Update repairTime for keyspaces on completion (CASSANDRA-13539) + * Add configurable upper bound for validation executor threads (CASSANDRA-13521) + * Bring back maxHintTTL propery (CASSANDRA-12982) + * Add testing guidelines (CASSANDRA-13497) + * Add more repair metrics (CASSANDRA-13531) + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650) + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367) + * Log time elapsed for each incremental repair phase (CASSANDRA-13498) + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780) + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262) + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421) + * Improve incremental repair logging (CASSANDRA-13468) + * Start compaction when incremental repair finishes (CASSANDRA-13454) + * Add repair streaming preview (CASSANDRA-13257) + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430) + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145) + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661) + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354) + * Skip building views during base table streams on range movements (CASSANDRA-13065) + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197) + * Remove deprecated repair JMX APIs (CASSANDRA-11530) + * Fix version check to enable streaming keep-alive (CASSANDRA-12929) + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289) + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324) + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360) + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359) + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336) + * Incremental repair not streaming correct sstables (CASSANDRA-13328) + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300) + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132) + * Remove config option index_interval (CASSANDRA-10671) + * Reduce lock contention for collection types and serializers (CASSANDRA-13271) + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283) + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292) + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520) + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226) + * Remove unused method (CASSANDRA-13227) + * Fix minor bugs related to #9143 (CASSANDRA-13217) + * Output warning if user increases RF (CASSANDRA-13079) + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081) + * Add support for + and - operations on dates (CASSANDRA-11936) + * Fix consistency of incrementally repaired data (CASSANDRA-9143) + * Increase commitlog version (CASSANDRA-13161) + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425) + * Refactor ColumnCondition (CASSANDRA-12981) + * Parallelize streaming of different keyspaces (CASSANDRA-4663) + * Improved compactions metrics (CASSANDRA-13015) + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031) + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855) + * Thrift removal (CASSANDRA-11115) + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716) + * Add column definition kind to dropped columns in schema (CASSANDRA-12705) + * Add (automate) Nodetool Documentation (CASSANDRA-12672) + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736) + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681) + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422) + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080) + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084) + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510) + * Allow IN restrictions on column families with collections (CASSANDRA-12654) + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028) + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029) + * Add mutation size and batch metrics (CASSANDRA-12649) + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999) + * Expose time spent waiting in thread pool queue (CASSANDRA-8398) + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969) + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946) + * Add support for arithmetic operators (CASSANDRA-11935) + * Add histogram for delay to deliver hints (CASSANDRA-13234) + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720) + * Trivial format error in StorageProxy (CASSANDRA-13551) + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480) + * Anticompaction can cause noisy log messages (CASSANDRA-13684) + * Switch to client init for sstabledump (CASSANDRA-13683) + * CQLSH: Don't pause when capturing data (CASSANDRA-13743) + * nodetool clearsnapshot requires --all to clear all snapshots (CASSANDRA-13391) + * Correctly count range tombstones in traces and tombstone thresholds (CASSANDRA-8527) + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243) + * Multi-version in-JVM dtests (CASSANDRA-14937) + * Allow instance class loaders to be garbage collected for inJVM dtest (CASSANDRA-15170) + 3.11.5 - * Fix SASI non-literal string comparisons (range operators) (CASSANDRA-15169) * Make sure user defined compaction transactions are always closed (CASSANDRA-15123) * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config (CASSANDRA-14305) * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903) diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 326c005,fa00e5b..61129bb --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -2194,29 -2259,25 +2195,28 @@@ public class ColumnFamilyStore implemen // and so we only run one major compaction at a time synchronized (this) { - logger.trace("Cancelling in-progress compactions for {}", metadata.cfName); + logger.trace("Cancelling in-progress compactions for {}", metadata.name); + Iterable<ColumnFamilyStore> toInterruptFor = interruptIndexes + ? concatWithIndexes() + : Collections.singleton(this); - Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews - ? Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs()) - : concatWithIndexes(); + toInterruptFor = interruptViews + ? Iterables.concat(toInterruptFor, viewManager.allViewsCfs()) + : toInterruptFor; - for (ColumnFamilyStore cfs : toInterruptFor) - cfs.getCompactionStrategyManager().pause(); - try + try (CompactionManager.CompactionPauser pause = CompactionManager.instance.pauseGlobalCompaction(); - CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(selfWithAuxiliaryCfs)) ++ CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(toInterruptFor)) { // interrupt in-progress compactions - CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation); - CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs); + CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation); + CompactionManager.instance.waitForCessation(toInterruptFor, sstablesPredicate); // doublecheck that we finished, instead of timing out - for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) + for (ColumnFamilyStore cfs : toInterruptFor) { - if (!cfs.getTracker().getCompacting().isEmpty()) + if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate)) { - logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.cfName); + logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.name); return null; } } diff --cc src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index 09bed74,a6dbd9d..275ce70 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@@ -184,32 -165,15 +184,38 @@@ public final class CompactionInf stopRequested = true; } + /** + * if this compaction involves several/all tables we can safely check globalCompactionsPaused + * in isStopRequested() below + */ + public abstract boolean isGlobal(); + public boolean isStopRequested() { - return stopRequested; + return stopRequested || (isGlobal() && CompactionManager.instance.isGlobalCompactionPaused()); } } + + public enum Unit + { + BYTES("bytes"), RANGES("token range parts"), KEYS("keys"); + + private final String name; + + Unit(String name) + { + this.name = name; + } + + @Override + public String toString() + { + return this.name; + } + + public static boolean isFileSize(String unit) + { + return BYTES.toString().equals(unit); + } + } } diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 789d1ee,9aba938..1128108 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@@ -120,10 -124,14 +120,15 @@@ public class CompactionIterator extend type, bytesRead, totalBytes, - compactionId); + compactionId, + sstables); } + public boolean isGlobal() + { + return false; + } + private void updateCounterFor(int rows) { assert rows > 0 && rows - 1 < mergeCounters.length; diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index bb1d585,a08d08b..cfca7d9 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -131,15 -118,11 +131,18 @@@ public class CompactionManager implemen @VisibleForTesting final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create(); + public final ActiveCompactions active = new ActiveCompactions(); + + // used to temporarily pause non-strategy managed compactions (like index summary redistribution) + private final AtomicInteger globalCompactionPauseCount = new AtomicInteger(0); + private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE); + public CompactionMetrics getMetrics() + { + return metrics; + } + /** * Gets compaction rate limiter. * Rate unit is bytes per sec. @@@ -2161,14 -2136,25 +2164,37 @@@ } } ++ + public List<CompactionInfo> getSSTableTasks() + { + return active.getCompactions() + .stream() + .map(CompactionInfo.Holder::getCompactionInfo) + .filter(task -> task.getTaskType() != OperationType.COUNTER_CACHE_SAVE + && task.getTaskType() != OperationType.KEY_CACHE_SAVE + && task.getTaskType() != OperationType.ROW_CACHE_SAVE) + .collect(Collectors.toList()); + } ++ + /** + * Return whether "global" compactions should be paused, used by ColumnFamilyStore#runWithCompactionsDisabled + * + * a global compaction is one that includes several/all tables, currently only IndexSummaryBuilder + */ + public boolean isGlobalCompactionPaused() + { + return globalCompactionPauseCount.get() > 0; + } + + public CompactionPauser pauseGlobalCompaction() + { + CompactionPauser pauser = globalCompactionPauseCount::decrementAndGet; + globalCompactionPauseCount.incrementAndGet(); + return pauser; + } + + public interface CompactionPauser extends AutoCloseable + { + public void close(); + } } diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 685da2e,2efcd11..725f04d --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -184,13 -184,17 +184,15 @@@ public class CompactionTask extends Abs long lastBytesScanned = 0; - if (!controller.cfs.getCompactionStrategyManager().isActive()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (collector != null) - collector.beginCompaction(ci); -- + activeCompactions.beginCompaction(ci); - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact)) { + // Note that we need to re-check this flag after calling beginCompaction above to avoid a window + // where the compaction does not exist in activeCompactions but the CSM gets paused. + // We already have the sstables marked compacting here so CompactionManager#waitForCessation will + // block until the below exception is thrown and the transaction is cancelled. + if (!controller.cfs.getCompactionStrategyManager().isActive()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); estimatedKeys = writer.estimatedKeys(); while (ci.hasNext()) { diff --cc src/java/org/apache/cassandra/db/view/ViewBuilderTask.java index d041b48,0000000..c84c697 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java @@@ -1,269 -1,0 +1,274 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.view; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.util.concurrent.Futures; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.ReadQuery; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInfo.Unit; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Refs; + +public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<Long> +{ + private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class); + + private static final int ROWS_BETWEEN_CHECKPOINTS = 1000; + + private final ColumnFamilyStore baseCfs; + private final View view; + private final Range<Token> range; + private final UUID compactionId; + private volatile Token prevToken; + private volatile long keysBuilt = 0; + private volatile boolean isStopped = false; + private volatile boolean isCompactionInterrupted = false; + + @VisibleForTesting + public ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt) + { + this.baseCfs = baseCfs; + this.view = view; + this.range = range; + this.compactionId = UUIDGen.getTimeUUID(); + this.prevToken = lastToken; + this.keysBuilt = keysBuilt; + } + + @SuppressWarnings("resource") + private void buildKey(DecoratedKey key) + { + ReadQuery selectQuery = view.getReadQuery(); + + if (!selectQuery.selectsKey(key)) + { + logger.trace("Skipping {}, view query filters", key); + return; + } + + int nowInSec = FBUtilities.nowInSeconds(); + SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec); + + // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates + // and pretend that there is nothing pre-existing. + UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false); + + try (ReadExecutionController orderGroup = command.executionController(); + UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command)) + { + Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager + .forTable(baseCfs.metadata.id) + .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true); + + AtomicLong noBase = new AtomicLong(Long.MAX_VALUE); + mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime())); + } + } + + public Long call() + { + String ksName = baseCfs.metadata.keyspace; + + if (prevToken == null) + logger.debug("Starting new view build for range {}", range); + else + logger.debug("Resuming view build for range {} from token {} with {} covered keys", range, prevToken, keysBuilt); + + /* + * It's possible for view building to start before MV creation got propagated to other nodes. For this reason + * we should wait for schema to converge before attempting to send any view mutations to other nodes, or else + * face UnknownTableException upon Mutation deserialization on the nodes that haven't processed the schema change. + */ + boolean schemaConverged = Gossiper.instance.waitForSchemaAgreement(10, TimeUnit.SECONDS, () -> this.isStopped); + if (!schemaConverged) + logger.warn("Failed to get schema to converge before building view {}.{}", baseCfs.keyspace.getName(), view.name); + + Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function; + function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> range.intersects(s.getBounds())); + + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(function); + Refs<SSTableReader> sstables = viewFragment.refs; + ReducingKeyIterator keyIter = new ReducingKeyIterator(sstables)) + { + PeekingIterator<DecoratedKey> iter = Iterators.peekingIterator(keyIter); + while (!isStopped && iter.hasNext()) + { + DecoratedKey key = iter.next(); + Token token = key.getToken(); + //skip tokens already built or not present in range + if (range.contains(token) && (prevToken == null || token.compareTo(prevToken) > 0)) + { + buildKey(key); + ++keysBuilt; + //build other keys sharing the same token + while (iter.hasNext() && iter.peek().getToken().equals(token)) + { + key = iter.next(); + buildKey(key); + ++keysBuilt; + } + if (keysBuilt % ROWS_BETWEEN_CHECKPOINTS == 1) + SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, token, keysBuilt); + prevToken = token; + } + } + } + + finish(); + + return keysBuilt; + } + + private void finish() + { + String ksName = baseCfs.keyspace.getName(); + if (!isStopped) + { + // Save the completed status using the end of the range as last token. This way it will be possible for + // future view build attempts to don't even create a task for this range + SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, range.right, keysBuilt); + + logger.debug("Completed build of view({}.{}) for range {} after covering {} keys ", ksName, view.name, range, keysBuilt); + } + else + { + logger.debug("Stopped build for view({}.{}) for range {} after covering {} keys", ksName, view.name, range, keysBuilt); + + // If it's stopped due to a compaction interruption we should throw that exception. + // Otherwise we assume that the task has been stopped due to a schema update and we can finish successfully. + if (isCompactionInterrupted) + throw new StoppedException(ksName, view.name, getCompactionInfo()); + } + } + + @Override + public CompactionInfo getCompactionInfo() + { + // we don't know the sstables at construction of ViewBuilderTask and we could change this to return once we know the + // but since we basically only cancel view builds on truncation where we cancel all compactions anyway, this seems reasonable + + // If there's splitter, calculate progress based on last token position + if (range.left.getPartitioner().splitter().isPresent()) + { + long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000); + return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId); + } + + // When there is no splitter, estimate based on number of total keys but + // take the max with keysBuilt + 1 to avoid having more completed than total + long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range)); + return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId); + } + + @Override + public void stop() + { + stop(true); + } + ++ public boolean isGlobal() ++ { ++ return false; ++ } ++ + synchronized void stop(boolean isCompactionInterrupted) + { + isStopped = true; + this.isCompactionInterrupted = isCompactionInterrupted; + } + + long keysBuilt() + { + return keysBuilt; + } + + /** + * {@link CompactionInterruptedException} with {@link Object#equals(Object)} and {@link Object#hashCode()} + * implementations that consider equals all the exceptions produced by the same view build, independently of their + * token range. + * <p> + * This is used to avoid Guava's {@link Futures#allAsList(Iterable)} log spamming when multiple build tasks fail + * due to compaction interruption. + */ + static class StoppedException extends CompactionInterruptedException + { + private final String ksName, viewName; + + private StoppedException(String ksName, String viewName, CompactionInfo info) + { + super(info); + this.ksName = ksName; + this.viewName = viewName; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof StoppedException)) + return false; + + StoppedException that = (StoppedException) o; + return Objects.equal(this.ksName, that.ksName) && Objects.equal(this.viewName, that.viewName); + } + + @Override + public int hashCode() + { + return 31 * ksName.hashCode() + viewName.hashCode(); + } + } +} diff --cc src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java index 9ec8a4e,8276626..73dc334 --- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java @@@ -25,4 -25,8 +25,9 @@@ import org.apache.cassandra.db.compacti public abstract class SecondaryIndexBuilder extends CompactionInfo.Holder { public abstract void build(); ++ + public boolean isGlobal() + { + return false; + } } diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 1f4059a,dea1cd6..880b738 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@@ -228,13 -224,13 +228,15 @@@ public class IndexSummaryManager implem public void redistributeSummaries() throws IOException { + if (CompactionManager.instance.isGlobalCompactionPaused()) + return; - Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables(); + Pair<Long, Map<TableId, LifecycleTransaction>> redistributionTransactionInfo = getRestributionTransactions(); + Map<TableId, LifecycleTransaction> transactions = redistributionTransactionInfo.right; + long nonRedistributingOffHeapSize = redistributionTransactionInfo.left; try { - redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left, - compactingAndNonCompacting.right, + redistributeSummaries(new IndexSummaryRedistribution(transactions, + nonRedistributingOffHeapSize, this.memoryPoolBytes)); } catch (Exception e) diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java index a8fcad1,d80adc0..1300c99 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java @@@ -305,9 -312,14 +306,14 @@@ public class IndexSummaryRedistributio public CompactionInfo getCompactionInfo() { - return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId); + return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId); } + public boolean isGlobal() + { + return true; + } + /** Utility class for sorting sstables by their read rates. */ private static class ReadRateComparator implements Comparator<SSTableReader> { diff --cc test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java index 976180c,bcbe92d..60433bb --- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java @@@ -18,436 -18,88 +18,506 @@@ package org.apache.cassandra.db.compaction; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import com.google.common.collect.ImmutableSet; - import org.junit.BeforeClass; + import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Test; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.metrics.CompactionMetrics; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.repair.PendingAntiCompaction; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.StubIndex; +import org.apache.cassandra.index.internal.CollatedViewIndexBuilder; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.schema.MockSchema; ++import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.FBUtilities; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import static org.junit.Assert.fail; public class CancelCompactionsTest extends CQLTester { + /** + * makes sure we only cancel compactions if the precidate says we have overlapping sstables + */ + @Test + public void cancelTest() throws InterruptedException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + List<SSTableReader> sstables = createSSTables(cfs, 10, 0); + Set<SSTableReader> toMarkCompacting = new HashSet<>(sstables.subList(0, 3)); + + TestCompactionTask tct = new TestCompactionTask(cfs, toMarkCompacting); + try + { + tct.start(); + + List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions(); + assertEquals(1, activeCompactions.size()); + assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), toMarkCompacting); + // predicate requires the non-compacting sstables, should not cancel the one currently compacting: + cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false, true); + assertEquals(1, activeCompactions.size()); + assertFalse(activeCompactions.get(0).isStopRequested()); + + // predicate requires the compacting ones - make sure stop is requested and that when we abort that + // compaction we actually run the callable (countdown the latch) + CountDownLatch cdl = new CountDownLatch(1); + Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false, true)); + t.start(); + while (!activeCompactions.get(0).isStopRequested()) + Thread.sleep(100); + + // cdl.countDown will not get executed until we have aborted all compactions for the sstables in toMarkCompacting + assertFalse(cdl.await(2, TimeUnit.SECONDS)); + tct.abort(); + // now the compactions are aborted and we can successfully wait for the latch + t.join(); + assertTrue(cdl.await(2, TimeUnit.SECONDS)); + } + finally + { + tct.abort(); + } + } + + /** + * make sure we only cancel relevant compactions when there are multiple ongoing compactions + */ + @Test + public void multipleCompactionsCancelTest() throws InterruptedException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + List<SSTableReader> sstables = createSSTables(cfs, 10, 0); + + List<TestCompactionTask> tcts = new ArrayList<>(); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 3)))); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(6, 9)))); + + try + { + tcts.forEach(TestCompactionTask::start); + + List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions(); + assertEquals(2, activeCompactions.size()); + + Set<Set<SSTableReader>> compactingSSTables = new HashSet<>(); + compactingSSTables.add(activeCompactions.get(0).getCompactionInfo().getSSTables()); + compactingSSTables.add(activeCompactions.get(1).getCompactionInfo().getSSTables()); + Set<Set<SSTableReader>> expectedSSTables = new HashSet<>(); + expectedSSTables.add(new HashSet<>(sstables.subList(0, 3))); + expectedSSTables.add(new HashSet<>(sstables.subList(6, 9))); + assertEquals(compactingSSTables, expectedSSTables); + + cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false, true); + assertEquals(2, activeCompactions.size()); + assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested)); + + CountDownLatch cdl = new CountDownLatch(1); + // start a compaction which only needs the sstables where first token is > 50 - these are the sstables compacted by tcts.get(1) + Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false, true)); + t.start(); + activeCompactions = CompactionManager.instance.active.getCompactions(); + assertEquals(2, activeCompactions.size()); + Thread.sleep(500); + for (CompactionInfo.Holder holder : activeCompactions) + { + if (holder.getCompactionInfo().getSSTables().containsAll(sstables.subList(6, 9))) + assertTrue(holder.isStopRequested()); + else + assertFalse(holder.isStopRequested()); + } + tcts.get(1).abort(); + assertEquals(1, CompactionManager.instance.active.getCompactions().size()); + cdl.await(); + t.join(); + } + finally + { + tcts.forEach(TestCompactionTask::abort); + } + } + + /** + * Makes sure sub range compaction now only cancels the relevant compactions, not all of them + */ + @Test + public void testSubrangeCompaction() throws InterruptedException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + List<SSTableReader> sstables = createSSTables(cfs, 10, 0); + + List<TestCompactionTask> tcts = new ArrayList<>(); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 2)))); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(3, 4)))); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(5, 7)))); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(8, 9)))); + try + { + tcts.forEach(TestCompactionTask::start); + + List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions(); + assertEquals(4, activeCompactions.size()); + Range<Token> range = new Range<>(token(0), token(49)); + Thread t = new Thread(() -> { + try + { + cfs.forceCompactionForTokenRange(Collections.singleton(range)); + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + }); + + t.start(); + + Thread.sleep(500); + assertEquals(4, CompactionManager.instance.active.getCompactions().size()); + List<TestCompactionTask> toAbort = new ArrayList<>(); + for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions()) + { + if (holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> sstable.intersects(Collections.singleton(range)))) + { + assertTrue(holder.isStopRequested()); + for (TestCompactionTask tct : tcts) + if (tct.sstables.equals(holder.getCompactionInfo().getSSTables())) + toAbort.add(tct); + } + else + assertFalse(holder.isStopRequested()); + } + assertEquals(2, toAbort.size()); + toAbort.forEach(TestCompactionTask::abort); + t.join(); + + } + finally + { + tcts.forEach(TestCompactionTask::abort); + } + } + + @Test + public void testAnticompaction() throws InterruptedException, ExecutionException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + List<SSTableReader> sstables = createSSTables(cfs, 10, 0); + List<SSTableReader> alreadyRepairedSSTables = createSSTables(cfs, 10, 10); + for (SSTableReader sstable : alreadyRepairedSSTables) + AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis()); + assertEquals(20, cfs.getLiveSSTables().size()); + List<TestCompactionTask> tcts = new ArrayList<>(); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 2)))); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(3, 4)))); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(5, 7)))); + tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(8, 9)))); + + List<TestCompactionTask> nonAffectedTcts = new ArrayList<>(); + nonAffectedTcts.add(new TestCompactionTask(cfs, new HashSet<>(alreadyRepairedSSTables))); + + try + { + tcts.forEach(TestCompactionTask::start); + nonAffectedTcts.forEach(TestCompactionTask::start); + List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions(); + assertEquals(5, activeCompactions.size()); + // make sure that sstables are fully contained so that the metadata gets mutated + Range<Token> range = new Range<>(token(-1), token(49)); + + UUID prsid = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(prsid, InetAddressAndPort.getLocalHost(), Collections.singletonList(cfs), Collections.singleton(range), true, 1, true, PreviewKind.NONE); + + InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); + RangesAtEndpoint rae = RangesAtEndpoint.builder(local).add(new Replica(local, range, true)).build(); + + PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), rae, Executors.newSingleThreadExecutor(), () -> false); + Future<?> fut = pac.run(); + Thread.sleep(600); + List<TestCompactionTask> toAbort = new ArrayList<>(); + for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions()) + { + if (holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> sstable.intersects(Collections.singleton(range)) && !sstable.isRepaired() && !sstable.isPendingRepair())) + { + assertTrue(holder.isStopRequested()); + for (TestCompactionTask tct : tcts) + if (tct.sstables.equals(holder.getCompactionInfo().getSSTables())) + toAbort.add(tct); + } + else + assertFalse(holder.isStopRequested()); + } + assertEquals(2, toAbort.size()); + toAbort.forEach(TestCompactionTask::abort); + fut.get(); + for (SSTableReader sstable : sstables) + assertTrue(!sstable.intersects(Collections.singleton(range)) || sstable.isPendingRepair()); + } + finally + { + tcts.forEach(TestCompactionTask::abort); + nonAffectedTcts.forEach(TestCompactionTask::abort); + } + } + + /** + * Make sure index rebuilds get cancelled + */ + @Test + public void testIndexRebuild() throws ExecutionException, InterruptedException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + List<SSTableReader> sstables = createSSTables(cfs, 5, 0); + Index idx = new StubIndex(cfs, null); + CountDownLatch indexBuildStarted = new CountDownLatch(1); + CountDownLatch indexBuildRunning = new CountDownLatch(1); + CountDownLatch compactionsStopped = new CountDownLatch(1); + ReducingKeyIterator reducingKeyIterator = new ReducingKeyIterator(sstables) + { + @Override + public boolean hasNext() + { + indexBuildStarted.countDown(); + try + { + indexBuildRunning.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(); + } + return false; + } + }; + Future<?> f = CompactionManager.instance.submitIndexBuild(new CollatedViewIndexBuilder(cfs, Collections.singleton(idx), reducingKeyIterator, ImmutableSet.copyOf(sstables))); + // wait for hasNext to get called + indexBuildStarted.await(); + assertEquals(1, CompactionManager.instance.active.getCompactions().size()); + boolean foundCompaction = false; + for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions()) + { + if (holder.getCompactionInfo().getSSTables().equals(new HashSet<>(sstables))) + { + assertFalse(holder.isStopRequested()); + foundCompaction = true; + } + } + assertTrue(foundCompaction); + cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false, true); + // wait for the runWithCompactionsDisabled callable + compactionsStopped.await(); + assertEquals(1, CompactionManager.instance.active.getCompactions().size()); + foundCompaction = false; + for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions()) + { + if (holder.getCompactionInfo().getSSTables().equals(new HashSet<>(sstables))) + { + assertTrue(holder.isStopRequested()); + foundCompaction = true; + } + } + assertTrue(foundCompaction); + // signal that the index build should be finished + indexBuildRunning.countDown(); + f.get(); + assertTrue(CompactionManager.instance.active.getCompactions().isEmpty()); + } + + long first(SSTableReader sstable) + { + return (long)sstable.first.getToken().getTokenValue(); + } + + Token token(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + private List<SSTableReader> createSSTables(ColumnFamilyStore cfs, int count, int startGeneration) + { + List<SSTableReader> sstables = new ArrayList<>(); + for (int i = 0; i < count; i++) + { + long first = i * 10; + long last = (i + 1) * 10 - 1; + sstables.add(MockSchema.sstable(startGeneration + i, 0, true, first, last, cfs)); + } + cfs.disableAutoCompaction(); + cfs.addSSTables(sstables); + return sstables; + } + + private static class TestCompactionTask + { + private ColumnFamilyStore cfs; + private final Set<SSTableReader> sstables; + private LifecycleTransaction txn; + private CompactionController controller; + private CompactionIterator ci; + private List<ISSTableScanner> scanners; + + public TestCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables) + { + this.cfs = cfs; + this.sstables = sstables; + } + + public void start() + { + scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList()); + txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); + assertNotNull(txn); + controller = new CompactionController(cfs, sstables, Integer.MIN_VALUE); + ci = new CompactionIterator(txn.opType(), scanners, controller, FBUtilities.nowInSeconds(), UUID.randomUUID()); + CompactionManager.instance.active.beginCompaction(ci); + } + + public void abort() + { + if (controller != null) + controller.close(); + if (ci != null) + ci.close(); + if (txn != null) + txn.abort(); + if (scanners != null) + scanners.forEach(ISSTableScanner::close); + CompactionManager.instance.active.finishCompaction(ci); + + } + } + + @Test + public void test2iCancellation() throws Throwable + { + createTable("create table %s (id int primary key, something int)"); + createIndex("create index on %s(something)"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, something) values (?, ?)", i, i); + flush(); + ColumnFamilyStore idx = getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next(); + Set<SSTableReader> sstables = new HashSet<>(); + try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION)) + { + getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;}, false, false, false); + } + // the predicate only gets compacting sstables, and we are only compacting the 2i sstables - with interruptIndexes = false we should see no sstables here + assertTrue(sstables.isEmpty()); + } + + @Test + public void testSubrangeCompactionWith2i() throws Throwable + { + createTable("create table %s (id int primary key, something int)"); + createIndex("create index on %s(something)"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, something) values (?, ?)", i, i); + flush(); + ColumnFamilyStore idx = getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next(); + try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION)) + { + IPartitioner partitioner = getCurrentColumnFamilyStore().getPartitioner(); + getCurrentColumnFamilyStore().forceCompactionForTokenRange(Collections.singleton(new Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken()))); + } + } ++ + @Test + public void testStandardCompactionTaskCancellation() throws Throwable + { + createTable("create table %s (id int primary key, something int)"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + + for (int i = 0; i < 10; i++) + { + execute("insert into %s (id, something) values (?,?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + } + AbstractCompactionTask ct = null; + + for (List<AbstractCompactionStrategy> css : getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies()) + { + for (AbstractCompactionStrategy cs : css) + { + ct = cs.getNextBackgroundTask(0); + if (ct != null) + break; + } - if (ct != null) - break; ++ if (ct != null) break; + } + assertNotNull(ct); + + CountDownLatch waitForBeginCompaction = new CountDownLatch(1); + CountDownLatch waitForStart = new CountDownLatch(1); - Iterable<CFMetaData> metadatas = Collections.singleton(getCurrentColumnFamilyStore().metadata); ++ Iterable<TableMetadata> metadatas = Collections.singleton(getCurrentColumnFamilyStore().metadata()); + /* + Here we ask strategies to pause & interrupt compactions right before calling beginCompaction in CompactionTask + The code running in the separate thread below mimics CFS#runWithCompactionsDisabled but we only allow + the real beginCompaction to be called after pausing & interrupting. + */ + Thread t = new Thread(() -> { + Uninterruptibles.awaitUninterruptibly(waitForBeginCompaction); + getCurrentColumnFamilyStore().getCompactionStrategyManager().pause(); - CompactionManager.instance.interruptCompactionFor(metadatas, false); ++ CompactionManager.instance.interruptCompactionFor(metadatas, (s) -> true, false); + waitForStart.countDown(); - CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore())); ++ CompactionManager.instance.waitForCessation(Collections.singleton(getCurrentColumnFamilyStore()), (s) -> true); + getCurrentColumnFamilyStore().getCompactionStrategyManager().resume(); + }); + t.start(); + + try + { - ct.execute(new CompactionMetrics() ++ ct.execute(new ActiveCompactions() + { + @Override + public void beginCompaction(CompactionInfo.Holder ci) + { + waitForBeginCompaction.countDown(); + Uninterruptibles.awaitUninterruptibly(waitForStart); + super.beginCompaction(ci); + } + }); + fail("execute should throw CompactionInterruptedException"); + } + catch (CompactionInterruptedException cie) + { + // expected + } + finally + { + ct.transaction.abort(); + t.join(); + } + } } diff --cc test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java index 6bef001,0000000..1c5c245 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java @@@ -1,747 -1,0 +1,762 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.AbstractPendingRepairTest; +import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.CompactionIterator; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.repair.consistent.LocalSessionAccessor; +import org.apache.cassandra.schema.MockSchema; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.concurrent.Transactional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest +{ + static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class); + + private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback + { + public InstrumentedAcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint ranges) + { + super(parentRepairSession, ranges, () -> false); + } + + Set<TableId> submittedCompactions = new HashSet<>(); + + ListenableFuture<?> submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result) + { + submittedCompactions.add(result.cfs.metadata.id); + result.abort(); // prevent ref leak complaints + return ListenableFutureTask.create(() -> {}, null); + } + } + + /** + * verify the pending anti compaction happy path + */ + @Test + public void successCase() throws Exception + { + Assert.assertSame(ByteOrderedPartitioner.class, DatabaseDescriptor.getPartitioner().getClass()); + cfs.disableAutoCompaction(); + + // create 2 sstables, one that will be split, and another that will be moved + for (int i = 0; i < 8; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); + } + cfs.forceBlockingFlush(); + for (int i = 8; i < 12; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); + } + cfs.forceBlockingFlush(); + assertEquals(2, cfs.getLiveSSTables().size()); + + Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6)); + Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16)); + List<ColumnFamilyStore> tables = Lists.newArrayList(cfs); + Collection<Range<Token>> ranges = Collections.singleton(new Range<>(left, right)); + + // create a session so the anti compaction can fine it + UUID sessionID = UUIDGen.getTimeUUID(); + ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), tables, ranges, true, 1, true, PreviewKind.NONE); + + PendingAntiCompaction pac; + ExecutorService executor = Executors.newSingleThreadExecutor(); + try + { + pac = new PendingAntiCompaction(sessionID, tables, atEndpoint(ranges, NO_RANGES), executor, () -> false); + pac.run().get(); + } + finally + { + executor.shutdown(); + } + + assertEquals(3, cfs.getLiveSSTables().size()); + int pendingRepair = 0; + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + if (sstable.isPendingRepair()) + pendingRepair++; + } + assertEquals(2, pendingRepair); + } + + @Test + public void acquisitionSuccess() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(6); + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + List<SSTableReader> expected = sstables.subList(0, 3); + Collection<Range<Token>> ranges = new HashSet<>(); + for (SSTableReader sstable : expected) + { + ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); + } + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID(), 0, 0); + + logger.info("SSTables: {}", sstables); + logger.info("Expected: {}", expected); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + assertNotNull(result); + logger.info("Originals: {}", result.txn.originals()); + assertEquals(3, result.txn.originals().size()); + for (SSTableReader sstable : expected) + { + logger.info("Checking {}", sstable); + assertTrue(result.txn.originals().contains(sstable)); + } + + assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); + result.abort(); + } + + @Test + public void repairedSSTablesAreNotAcquired() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + assertEquals(2, sstables.size()); + SSTableReader repaired = sstables.get(0); + SSTableReader unrepaired = sstables.get(1); + assertTrue(repaired.intersects(FULL_RANGE)); + assertTrue(unrepaired.intersects(FULL_RANGE)); + + repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false); + repaired.reloadSSTableMetadata(); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + assertNotNull(result); + + logger.info("Originals: {}", result.txn.originals()); + assertEquals(1, result.txn.originals().size()); + assertTrue(result.txn.originals().contains(unrepaired)); + result.abort(); // release sstable refs + } + + @Test + public void finalizedPendingRepairSSTablesAreNotAcquired() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + assertEquals(2, sstables.size()); + SSTableReader repaired = sstables.get(0); + SSTableReader unrepaired = sstables.get(1); + assertTrue(repaired.intersects(FULL_RANGE)); + assertTrue(unrepaired.intersects(FULL_RANGE)); + + UUID sessionId = prepareSession(); + LocalSessionAccessor.finalizeUnsafe(sessionId); + repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, sessionId, false); + repaired.reloadSSTableMetadata(); + assertTrue(repaired.isPendingRepair()); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + assertNotNull(result); + + logger.info("Originals: {}", result.txn.originals()); + assertEquals(1, result.txn.originals().size()); + assertTrue(result.txn.originals().contains(unrepaired)); + result.abort(); // releases sstable refs + } + + @Test + public void conflictingSessionAcquisitionFailure() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + assertEquals(2, sstables.size()); + SSTableReader repaired = sstables.get(0); + SSTableReader unrepaired = sstables.get(1); + assertTrue(repaired.intersects(FULL_RANGE)); + assertTrue(unrepaired.intersects(FULL_RANGE)); + + UUID sessionId = prepareSession(); + repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, sessionId, false); + repaired.reloadSSTableMetadata(); + assertTrue(repaired.isPendingRepair()); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + assertNull(result); + } + + @Test + public void pendingRepairNoSSTablesExist() throws Exception + { + cfs.disableAutoCompaction(); + + assertEquals(0, cfs.getLiveSSTables().size()); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + assertNotNull(result); + + result.abort(); // There's nothing to release, but we should exit cleanly + } + + /** + * anti compaction task should be submitted if everything is ok + */ + @Test + public void callbackSuccess() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + assertNotNull(result); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES)); + assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result)); + + assertEquals(1, cb.submittedCompactions.size()); + assertTrue(cb.submittedCompactions.contains(cfm.id)); + } + + /** + * If one of the supplied AcquireResults is null, either an Exception was thrown, or + * we couldn't get a transaction for the sstables. In either case we need to cancel the repair, and release + * any sstables acquired for other tables + */ + @Test + public void callbackNullResult() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + assertNotNull(result); + assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList())); + assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result, null)); + + assertTrue(cb.submittedCompactions.isEmpty()); + assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state()); + } + + /** + * If an AcquireResult has a null txn, there were no sstables to acquire references + * for, so no anti compaction should have been submitted. + */ + @Test + public void callbackNullTxn() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + assertNotNull(result); + + ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id); + PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES)); + assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result, fakeResult)); + + assertEquals(1, cb.submittedCompactions.size()); + assertTrue(cb.submittedCompactions.contains(cfm.id)); + assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id)); + } + + + @Test + public void singleAnticompaction() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + UUID sessionID = UUIDGen.getTimeUUID(); + ActiveRepairService.instance.registerParentRepairSession(sessionID, + InetAddressAndPort.getByName("127.0.0.1"), + Lists.newArrayList(cfs), + FULL_RANGE, + true,0, + true, + PreviewKind.NONE); + CompactionManager.instance.performAnticompaction(result.cfs, atEndpoint(FULL_RANGE, NO_RANGES), result.refs, result.txn, sessionID, () -> false); + + } + + @Test (expected = CompactionInterruptedException.class) + public void cancelledAntiCompaction() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(1); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + UUID sessionID = UUIDGen.getTimeUUID(); + ActiveRepairService.instance.registerParentRepairSession(sessionID, + InetAddressAndPort.getByName("127.0.0.1"), + Lists.newArrayList(cfs), + FULL_RANGE, + true,0, + true, + PreviewKind.NONE); + + // attempt to anti-compact the sstable in half + SSTableReader sstable = Iterables.getOnlyElement(cfs.getLiveSSTables()); + Token left = cfs.getPartitioner().midpoint(sstable.first.getToken(), sstable.last.getToken()); + Token right = sstable.last.getToken(); + CompactionManager.instance.performAnticompaction(result.cfs, + atEndpoint(Collections.singleton(new Range<>(left, right)), NO_RANGES), + result.refs, result.txn, sessionID, () -> true); + } + + /** + * Makes sure that PendingAntiCompaction fails when anticompaction throws exception + */ + @Test + public void antiCompactionException() + { + cfs.disableAutoCompaction(); + makeSSTables(2); + UUID prsid = UUID.randomUUID(); + ListeningExecutorService es = MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService()); + PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false) { + @Override + protected AcquisitionCallback getAcquisitionCallback(UUID prsId, RangesAtEndpoint tokenRanges) + { + return new AcquisitionCallback(prsid, tokenRanges, () -> false) + { + @Override + ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result) + { + Runnable r = new WrappedRunnable() + { + protected void runMayThrow() + { + throw new CompactionInterruptedException(null); + } + }; + return es.submit(r); + } + }; + } + }; + ListenableFuture<?> fut = pac.run(); + try + { + fut.get(); + fail("Should throw exception"); + } + catch(Throwable t) + { + } + } + + @Test + public void testBlockedAcquisition() throws ExecutionException, InterruptedException, TimeoutException + { + cfs.disableAutoCompaction(); + ExecutorService es = Executors.newFixedThreadPool(1); + + makeSSTables(2); + UUID prsid = UUID.randomUUID(); + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList()); + try + { + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + CompactionController controller = new CompactionController(cfs, sstables, 0); + CompactionIterator ci = CompactionManager.getAntiCompactionIterator(scanners, controller, 0, UUID.randomUUID(), CompactionManager.instance.active, () -> false)) + { + // `ci` is our imaginary ongoing anticompaction which makes no progress until after 30s + // now we try to start a new AC, which will try to cancel all ongoing compactions + + CompactionManager.instance.active.beginCompaction(ci); + PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), 0, 0, es, () -> false); + ListenableFuture fut = pac.run(); + try + { + fut.get(30, TimeUnit.SECONDS); + fail("the future should throw exception since we try to start a new anticompaction when one is already running"); + } + catch (ExecutionException e) + { + assertTrue(e.getCause() instanceof PendingAntiCompaction.SSTableAcquisitionException); + } + + assertEquals(1, getCompactionsFor(cfs).size()); + for (CompactionInfo.Holder holder : getCompactionsFor(cfs)) + assertFalse(holder.isStopRequested()); + } + } + finally + { + es.shutdown(); + ISSTableScanner.closeAllAndPropagate(scanners, null); + } + } + + private List<CompactionInfo.Holder> getCompactionsFor(ColumnFamilyStore cfs) + { + List<CompactionInfo.Holder> compactions = new ArrayList<>(); + for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions()) + { + if (holder.getCompactionInfo().getTableMetadata().equals(cfs.metadata())) + compactions.add(holder); + } + return compactions; + } + + @Test + public void testUnblockedAcquisition() throws ExecutionException, InterruptedException + { + cfs.disableAutoCompaction(); + ExecutorService es = Executors.newFixedThreadPool(1); + makeSSTables(2); + UUID prsid = prepareSession(); + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList()); + try + { + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + CompactionController controller = new CompactionController(cfs, sstables, 0); + CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners, controller, 0, UUID.randomUUID())) + { + // `ci` is our imaginary ongoing anticompaction which makes no progress until after 5s + // now we try to start a new AC, which will try to cancel all ongoing compactions + + CompactionManager.instance.active.beginCompaction(ci); + PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false); + ListenableFuture fut = pac.run(); + try + { + fut.get(5, TimeUnit.SECONDS); + } + catch (TimeoutException e) + { + // expected, we wait 1 minute for compactions to get cancelled in runWithCompactionsDisabled, but we are not iterating + // CompactionIterator so the compaction is not actually cancelled + } + try + { + assertTrue(ci.hasNext()); + ci.next(); + fail("CompactionIterator should be abortable"); + } + catch (CompactionInterruptedException e) + { + CompactionManager.instance.active.finishCompaction(ci); + txn.abort(); + // expected + } + CountDownLatch cdl = new CountDownLatch(1); + Futures.addCallback(fut, new FutureCallback<Object>() + { + public void onSuccess(@Nullable Object o) + { + cdl.countDown(); + } + + public void onFailure(Throwable throwable) + { + } + }, MoreExecutors.directExecutor()); + assertTrue(cdl.await(1, TimeUnit.MINUTES)); + } + } + finally + { + es.shutdown(); + } + } + + @Test + public void testSSTablePredicateOngoingAntiCompaction() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + cfs.disableAutoCompaction(); + List<SSTableReader> sstables = new ArrayList<>(); + List<SSTableReader> repairedSSTables = new ArrayList<>(); + List<SSTableReader> pendingSSTables = new ArrayList<>(); + for (int i = 1; i <= 10; i++) + { + SSTableReader sstable = MockSchema.sstable(i, i * 10, i * 10 + 9, cfs); + sstables.add(sstable); + } + for (int i = 1; i <= 10; i++) + { + SSTableReader sstable = MockSchema.sstable(i + 10, i * 10, i * 10 + 9, cfs); + AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis()); + repairedSSTables.add(sstable); + } + for (int i = 1; i <= 10; i++) + { + SSTableReader sstable = MockSchema.sstable(i + 20, i * 10, i * 10 + 9, cfs); + AbstractPendingRepairTest.mutateRepaired(sstable, UUID.randomUUID(), false); + pendingSSTables.add(sstable); + } + + cfs.addSSTables(sstables); + cfs.addSSTables(repairedSSTables); + + // if we are compacting the non-repaired non-pending sstables, we should get an error + tryPredicate(cfs, sstables, null, true); + // make sure we don't try to grab pending or repaired sstables; + tryPredicate(cfs, repairedSSTables, sstables, false); + tryPredicate(cfs, pendingSSTables, sstables, false); + } + + private void tryPredicate(ColumnFamilyStore cfs, List<SSTableReader> compacting, List<SSTableReader> expectedLive, boolean shouldFail) + { + CompactionInfo.Holder holder = new CompactionInfo.Holder() + { + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 1000, UUID.randomUUID(), compacting); + } ++ ++ public boolean isGlobal() ++ { ++ return false; ++ } + }; + CompactionManager.instance.active.beginCompaction(holder); + try + { + PendingAntiCompaction.AntiCompactionPredicate predicate = + new PendingAntiCompaction.AntiCompactionPredicate(Collections.singleton(new Range<>(new Murmur3Partitioner.LongToken(0), new Murmur3Partitioner.LongToken(100))), + UUID.randomUUID()); + Set<SSTableReader> live = cfs.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet()); + if (shouldFail) + fail("should fail - we try to grab already anticompacting sstables for anticompaction"); + assertEquals(live, new HashSet<>(expectedLive)); + } + catch (PendingAntiCompaction.SSTableAcquisitionException e) + { + if (!shouldFail) + fail("We should not fail filtering sstables"); + } + finally + { + CompactionManager.instance.active.finishCompaction(holder); + } + } + + @Test + public void testRetries() throws InterruptedException, ExecutionException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + cfs.addSSTable(MockSchema.sstable(1, true, cfs)); + CountDownLatch cdl = new CountDownLatch(5); + ExecutorService es = Executors.newFixedThreadPool(1); + CompactionInfo.Holder holder = new CompactionInfo.Holder() + { + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables()); + } ++ ++ public boolean isGlobal() ++ { ++ return false; ++ } + }; + try + { + PendingAntiCompaction.AntiCompactionPredicate acp = new PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID()) + { + @Override + public boolean apply(SSTableReader sstable) + { + cdl.countDown(); + if (cdl.getCount() > 0) + throw new PendingAntiCompaction.SSTableAcquisitionException("blah"); + return true; + } + }; + CompactionManager.instance.active.beginCompaction(holder); + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 10, 1, acp); + Future f = es.submit(acquisitionCallable); + cdl.await(); + assertNotNull(f.get()); + } + finally + { + es.shutdown(); + CompactionManager.instance.active.finishCompaction(holder); + } + } + + @Test + public void testRetriesTimeout() throws InterruptedException, ExecutionException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + cfs.addSSTable(MockSchema.sstable(1, true, cfs)); + ExecutorService es = Executors.newFixedThreadPool(1); + CompactionInfo.Holder holder = new CompactionInfo.Holder() + { + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables()); + } ++ ++ public boolean isGlobal() ++ { ++ return false; ++ } + }; + try + { + PendingAntiCompaction.AntiCompactionPredicate acp = new PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID()) + { + @Override + public boolean apply(SSTableReader sstable) + { + throw new PendingAntiCompaction.SSTableAcquisitionException("blah"); + } + }; + CompactionManager.instance.active.beginCompaction(holder); + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 2, 1000, acp); + Future fut = es.submit(acquisitionCallable); + assertNull(fut.get()); + } + finally + { + es.shutdown(); + CompactionManager.instance.active.finishCompaction(holder); + } + } + + @Test + public void testWith2i() throws ExecutionException, InterruptedException + { + cfs2.disableAutoCompaction(); + makeSSTables(2, cfs2, 100); + ColumnFamilyStore idx = cfs2.indexManager.getAllIndexColumnFamilyStores().iterator().next(); + ExecutorService es = Executors.newFixedThreadPool(1); + try + { + UUID prsid = prepareSession(); + for (SSTableReader sstable : cfs2.getLiveSSTables()) + assertFalse(sstable.isPendingRepair()); + + // mark the sstables pending, with a 2i compaction going, which should be untouched; + try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION)) + { + PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs2), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false); + pac.run().get(); + } + // and make sure it succeeded; + for (SSTableReader sstable : cfs2.getLiveSSTables()) + assertTrue(sstable.isPendingRepair()); + } + finally + { + es.shutdown(); + } + } + + private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans) + { + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local); + for (Range<Token> range : full) + builder.add(new Replica(local, range, true)); + + for (Range<Token> range : trans) + builder.add(new Replica(local, range, false)); + + return builder.build(); + } +} diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 68ee3e1,b33ead2..d383e88 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@@ -61,6 -57,7 +61,7 @@@ import org.apache.cassandra.utils.ByteB import static com.google.common.collect.ImmutableMap.of; import static java.util.Arrays.asList; -import static org.apache.cassandra.db.compaction.AntiCompactionTest.assertOnDiskState; ++import static org.apache.cassandra.Util.assertOnDiskState; import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD; import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD; @@@ -68,7 -65,9 +69,8 @@@ import static org.junit.Assert.assertEq import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; + import static org.junit.Assert.fail; - @RunWith(OrderedJUnit4ClassRunner.class) public class IndexSummaryManagerTest { @@@ -640,69 -633,91 +642,111 @@@ final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>(); // barrier to control when redistribution runs final CountDownLatch barrier = new CountDownLatch(1); + CompactionInfo.Holder ongoingCompaction = new CompactionInfo.Holder() + { + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(cfs.metadata(), OperationType.UNKNOWN, 0, 0, UUID.randomUUID(), compacting); + } + - Thread t = NamedThreadFactory.createThread(new Runnable() ++ public boolean isGlobal() ++ { ++ return false; ++ } + }; + try (LifecycleTransaction ignored = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN)) { - public void run() + CompactionManager.instance.active.beginCompaction(ongoingCompaction); + + Thread t = NamedThreadFactory.createThread(new Runnable() { - try + public void run() { - // Don't leave enough space for even the minimal index summaries - try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + try + { + // Don't leave enough space for even the minimal index summaries + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(of(cfs.metadata.id, txn), + 0, + singleSummaryOffHeapSpace, + barrier)); + } + } + catch (CompactionInterruptedException ex) + { + exception.set(ex); + } + catch (IOException ignored) { - IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST, - of(cfs.metadata.cfId, txn), - singleSummaryOffHeapSpace, - barrier)); } } - catch (CompactionInterruptedException ex) - { - exception.set(ex); - } - catch (IOException ignored) - { - } - } - }); - t.start(); - while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive()) - Thread.sleep(1); - // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries - // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution - // to proceed until stopCompaction has been called. - cancelFunction.accept(cfs); - // allows the redistribution to proceed - barrier.countDown(); - t.join(); + }); + + t.start(); + while (CompactionManager.instance.getActiveCompactions() < 2 && t.isAlive()) + Thread.sleep(1); + // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries + // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution + // to proceed until stopCompaction has been called. + cancelFunction.accept(cfs); + // allows the redistribution to proceed + barrier.countDown(); + t.join(); + } + finally + { + CompactionManager.instance.active.finishCompaction(ongoingCompaction); + } assertNotNull("Expected compaction interrupted exception", exception.get()); - assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty()); + assertTrue("Expected no active compactions", CompactionManager.instance.active.getCompactions().isEmpty()); - Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables); + Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(allSSTables); Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getLiveSSTables()); Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables); assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s", Joiner.on(",").join(disjoint)), disjoint.isEmpty()); - Util.assertOnDiskState(cfs, 8); - - assertOnDiskState(cfs, numSSTables); ++ assertOnDiskState(cfs, 8); + validateData(cfs, numRows); + } + + @Test + public void testPauseIndexSummaryManager() throws Exception + { + String ksname = KEYSPACE1; + String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching + Keyspace keyspace = Keyspace.open(ksname); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + int numSSTables = 4; + int numRows = 256; + createSSTables(ksname, cfname, numSSTables, numRows); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + for (SSTableReader sstable : sstables) + sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0)); + + long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize(); + + // everything should get cut in half + assert sstables.size() == numSSTables; + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + try (AutoCloseable toresume = CompactionManager.instance.pauseGlobalCompaction()) + { - sstables = redistributeSummaries(Collections.emptyList(), of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2))); ++ sstables = redistributeSummaries(Collections.emptyList(), of(cfs.metadata().id, txn), (singleSummaryOffHeapSpace * (numSSTables / 2))); + fail("The redistribution should fail - we got paused before adding to active compactions, but after marking compacting"); + } + } + catch (CompactionInterruptedException e) + { + // expected + } + for (SSTableReader sstable : sstables) + assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); validateData(cfs, numRows); + assertOnDiskState(cfs, numSSTables); } private static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
