Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/41904684 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/41904684 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/41904684 Branch: refs/heads/trunk Commit: 41904684bb5509595d11f008d0851c7ce625e020 Parents: 2d2879d 14e46e4 Author: Marcus Eriksson <[email protected]> Authored: Fri Nov 24 14:20:57 2017 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Fri Nov 24 14:20:57 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/BlacklistedDirectories.java | 10 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 13 ++ .../org/apache/cassandra/db/DiskBoundaries.java | 71 +++++++++ .../cassandra/db/DiskBoundaryManager.java | 153 +++++++++++++++++++ src/java/org/apache/cassandra/db/Keyspace.java | 2 + src/java/org/apache/cassandra/db/Memtable.java | 20 +-- .../db/compaction/CompactionManager.java | 8 +- .../compaction/CompactionStrategyManager.java | 47 +++--- .../cassandra/db/compaction/Scrubber.java | 2 +- .../writers/CompactionAwareWriter.java | 13 +- .../sstable/format/RangeAwareSSTableWriter.java | 13 +- .../cassandra/service/StorageService.java | 58 ------- .../cassandra/db/DiskBoundaryManagerTest.java | 124 +++++++++++++++ 14 files changed, 422 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 81b3a6e,fc18dc3..4456af5 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,168 -1,5 +1,169 @@@ +4.0 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965) + * Make LWTs send resultset metadata on every request (CASSANDRA-13992) + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963) + * Introduce leaf-only iterator (CASSANDRA-9988) + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997) + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182) + * Refactoring to specialised functional interfaces (CASSANDRA-13982) + * Speculative retry should allow more friendly params (CASSANDRA-13876) + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944) + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291) + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728) + * Fix some alerts raised by static analysis (CASSANDRA-13799) + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593) + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786) + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941) + * Expose recent histograms in JmxHistograms (CASSANDRA-13642) + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899) + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906) + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925) + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961) + * Correctly close netty channels when a stream session ends (CASSANDRA-13905) + * Update lz4 to 1.4.0 (CASSANDRA-13741) + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862) + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299) + * Use compaction threshold for STCS in L0 (CASSANDRA-13861) + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703) + * Add extra information to SASI timeout exception (CASSANDRA-13677) + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818) + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786) + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846) + * Add keyspace and table name in schema validation exception (CASSANDRA-13845) + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771) + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837) + * Race condition when closing stream sessions (CASSANDRA-13852) + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831) + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696) + * Add stress profile yaml with LWT (CASSANDRA-7960) + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789) + * Simplify mx4j configuration (Cassandra-13578) + * Fix trigger example on 4.0 (CASSANDRA-13796) + * Force minumum timeout value (CASSANDRA-9375) + * Use netty for streaming (CASSANDRA-12229) + * Use netty for internode messaging (CASSANDRA-8457) + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774) + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758) + * Fix pending repair manager index out of bounds check (CASSANDRA-13769) + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576) + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664) + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594) + * Fix race / ref leak in anticompaction (CASSANDRA-13688) + * Expose tasks queue length via JMX (CASSANDRA-12758) + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751) + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615) + * Improve sstablemetadata output (CASSANDRA-11483) + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371) + * Introduce error metrics for repair (CASSANDRA-13387) + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732) + * Update metrics to 3.1.5 (CASSANDRA-13648) + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699) + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725) + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727) + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996) + * Default for start_native_transport now true if not set in config (CASSANDRA-13656) + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583) + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148) + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271) + * Use common nowInSec for validation compactions (CASSANDRA-13671) + * Improve handling of IR prepare failures (CASSANDRA-13672) + * Send IR coordinator messages synchronously (CASSANDRA-13673) + * Flush system.repair table before IR finalize promise (CASSANDRA-13660) + * Fix column filter creation for wildcard queries (CASSANDRA-13650) + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614) + * fix race condition in PendingRepairManager (CASSANDRA-13659) + * Allow noop incremental repair state transitions (CASSANDRA-13658) + * Run repair with down replicas (CASSANDRA-10446) + * Added started & completed repair metrics (CASSANDRA-13598) + * Added started & completed repair metrics (CASSANDRA-13598) + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130) + * Improve calculation of available disk space for compaction (CASSANDRA-13068) + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579) + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570) + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585) + * Fix Randomness of stress values (CASSANDRA-12744) + * Allow selecting Map values and Set elements (CASSANDRA-7396) + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444) + * Update repairTime for keyspaces on completion (CASSANDRA-13539) + * Add configurable upper bound for validation executor threads (CASSANDRA-13521) + * Bring back maxHintTTL propery (CASSANDRA-12982) + * Add testing guidelines (CASSANDRA-13497) + * Add more repair metrics (CASSANDRA-13531) + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650) + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367) + * Log time elapsed for each incremental repair phase (CASSANDRA-13498) + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780) + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262) + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421) + * Improve incremental repair logging (CASSANDRA-13468) + * Start compaction when incremental repair finishes (CASSANDRA-13454) + * Add repair streaming preview (CASSANDRA-13257) + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430) + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145) + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661) + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354) + * Skip building views during base table streams on range movements (CASSANDRA-13065) + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197) + * Remove deprecated repair JMX APIs (CASSANDRA-11530) + * Fix version check to enable streaming keep-alive (CASSANDRA-12929) + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289) + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324) + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360) + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359) + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336) + * Incremental repair not streaming correct sstables (CASSANDRA-13328) + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300) + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132) + * Remove config option index_interval (CASSANDRA-10671) + * Reduce lock contention for collection types and serializers (CASSANDRA-13271) + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283) + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292) + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520) + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226) + * Remove unused method (CASSANDRA-13227) + * Fix minor bugs related to #9143 (CASSANDRA-13217) + * Output warning if user increases RF (CASSANDRA-13079) + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081) + * Add support for + and - operations on dates (CASSANDRA-11936) + * Fix consistency of incrementally repaired data (CASSANDRA-9143) + * Increase commitlog version (CASSANDRA-13161) + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425) + * Refactor ColumnCondition (CASSANDRA-12981) + * Parallelize streaming of different keyspaces (CASSANDRA-4663) + * Improved compactions metrics (CASSANDRA-13015) + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031) + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855) + * Thrift removal (CASSANDRA-11115) + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716) + * Add column definition kind to dropped columns in schema (CASSANDRA-12705) + * Add (automate) Nodetool Documentation (CASSANDRA-12672) + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736) + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681) + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422) + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080) + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084) + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510) + * Allow IN restrictions on column families with collections (CASSANDRA-12654) + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028) + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029) + * Add mutation size and batch metrics (CASSANDRA-12649) + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999) + * Expose time spent waiting in thread pool queue (CASSANDRA-8398) + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969) + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946) + * Add support for arithmetic operators (CASSANDRA-11935) + * Add histogram for delay to deliver hints (CASSANDRA-13234) + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720) + * Trivial format error in StorageProxy (CASSANDRA-13551) + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480) + * Anticompaction can cause noisy log messages (CASSANDRA-13684) + * Switch to client init for sstabledump (CASSANDRA-13683) + * CQLSH: Don't pause when capturing data (CASSANDRA-13743) + + 3.11.2 + * Cache disk boundaries (CASSANDRA-13215) * Add asm jar to build.xml for maven builds (CASSANDRA-11193) * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897) * Update jackson JSON jars (CASSANDRA-13949) http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 10505e6,6305096..1b3ddb5 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -24,13 -23,12 +24,14 @@@ import java.util.concurrent.Callable import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.function.Supplier; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; + import org.apache.cassandra.db.DiskBoundaries; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.index.Index; -import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -52,8 -50,6 +52,7 @@@ import org.apache.cassandra.io.sstable. import org.apache.cassandra.notifications.*; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.service.ActiveRepairService; - import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Pair; /** * Manages the compaction strategies. @@@ -288,68 -239,26 +286,67 @@@ public class CompactionStrategyManager if (!cfs.getPartitioner().splitter().isPresent()) return 0; - Directories.DataDirectory[] directories = locations.getWriteableLocations(); - List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, directories); - if (boundaries == null) - return getCompactionStrategyIndex(locations, sstable.descriptor); + DiskBoundaries boundaries = cfs.getDiskBoundaries(); + List<Directories.DataDirectory> directories = boundaries.directories; - + if (boundaries.positions == null) - { - // try to figure out location based on sstable directory: - for (int i = 0; i < directories.size(); i++) - { - Directories.DataDirectory directory = directories.get(i); - if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath())) - return i; - } - return 0; - } ++ return getCompactionStrategyIndex(directories, sstable.descriptor); - int pos = Collections.binarySearch(boundaries, sstable.first); + int pos = Collections.binarySearch(boundaries.positions, sstable.first); assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal return -pos - 1; } + /** + * get the index for the descriptor based on the existing directories + * @param locations + * @param descriptor + * @return + */ - private static int getCompactionStrategyIndex(Directories locations, Descriptor descriptor) ++ private static int getCompactionStrategyIndex(List<Directories.DataDirectory> directories, Descriptor descriptor) + { - Directories.DataDirectory[] directories = locations.getWriteableLocations(); + // try to figure out location based on sstable directory: - for (int i = 0; i < directories.length; i++) ++ for (int i = 0; i < directories.size(); i++) + { - Directories.DataDirectory directory = directories[i]; ++ Directories.DataDirectory directory = directories.get(i); + if (descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath())) + return i; + } + return 0; + } + + @VisibleForTesting + List<AbstractCompactionStrategy> getRepaired() + { + return repaired; + } + + @VisibleForTesting + List<AbstractCompactionStrategy> getUnrepaired() + { + return unrepaired; + } + + @VisibleForTesting + List<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID) + { + List<AbstractCompactionStrategy> strategies = new ArrayList<>(pendingRepairs.size()); + pendingRepairs.forEach(p -> strategies.add(p.get(sessionID))); + return strategies; + } + + @VisibleForTesting + Set<UUID> pendingRepairs() + { + Set<UUID> ids = new HashSet<>(); + pendingRepairs.forEach(p -> ids.addAll(p.getSessions())); + return ids; + } + + public boolean hasDataForPendingRepair(UUID sessionID) + { + return Iterables.any(pendingRepairs, prm -> prm.hasDataForSession(sessionID)); + } + public void shutdown() { writeLock.lock(); @@@ -553,20 -449,16 +550,20 @@@ for (SSTableReader sstable : removed) { - int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + int i = getCompactionStrategyIndex(cfs, sstable); - if (sstable.isRepaired()) + if (sstable.isPendingRepair()) + pendingRemoved.get(i).add(sstable); + else if (sstable.isRepaired()) repairedRemoved.get(i).add(sstable); else unrepairedRemoved.get(i).add(sstable); } for (SSTableReader sstable : added) { - int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + int i = getCompactionStrategyIndex(cfs, sstable); - if (sstable.isRepaired()) + if (sstable.isPendingRepair()) + pendingAdded.get(i).add(sstable); + else if (sstable.isRepaired()) repairedAdded.get(i).add(sstable); else unrepairedAdded.get(i).add(sstable); @@@ -613,16 -494,9 +610,16 @@@ { for (SSTableReader sstable : sstables) { - int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + int index = getCompactionStrategyIndex(cfs, sstable); - if (sstable.isRepaired()) + if (sstable.isPendingRepair()) + { + pendingRepairs.get(index).addSSTable(sstable); + unrepaired.get(index).removeSSTable(sstable); + repaired.get(index).removeSSTable(sstable); + } + else if (sstable.isRepaired()) { + pendingRepairs.get(index).removeSSTable(sstable); unrepaired.get(index).removeSSTable(sstable); repaired.get(index).addSSTable(sstable); } @@@ -744,22 -618,6 +741,22 @@@ readLock.lock(); try { + for (SSTableReader sstable : sstables) + { - int idx = getCompactionStrategyIndex(cfs, getDirectories(), sstable); ++ int idx = getCompactionStrategyIndex(cfs, sstable); + if (sstable.isPendingRepair()) + pendingSSTables.get(idx).add(sstable); + else if (sstable.isRepaired()) + repairedSSTables.get(idx).add(sstable); + else + unrepairedSSTables.get(idx).add(sstable); + } + + for (int i = 0; i < pendingSSTables.size(); i++) + { + if (!pendingSSTables.get(i).isEmpty()) + scanners.addAll(pendingRepairs.get(i).getScanners(pendingSSTables.get(i), ranges)); + } for (int i = 0; i < repairedSSTables.size(); i++) { if (!repairedSSTables.get(i).isEmpty()) @@@ -845,17 -685,13 +842,17 @@@ SSTableReader firstSSTable = Iterables.getFirst(input, null); assert firstSSTable != null; boolean repaired = firstSSTable.isRepaired(); - int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable); + int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable); + boolean isPending = firstSSTable.isPendingRepair(); + UUID pendingRepair = firstSSTable.getSSTableMetadata().pendingRepair; for (SSTableReader sstable : input) { if (sstable.isRepaired() != repaired) throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction"); - if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable)) + if (firstIndex != getCompactionStrategyIndex(cfs, sstable)) throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction"); + if (isPending && !pendingRepair.equals(sstable.getSSTableMetadata().pendingRepair)) + throw new UnsupportedOperationException("You can't compact sstables from different pending repair sessions"); } } @@@ -922,16 -751,13 +919,16 @@@ try { Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream() - .filter(s -> !s.isMarkedSuspect() && s.isRepaired()) + .filter(s -> !s.isMarkedSuspect() && s.isRepaired() && !s.isPendingRepair()) - .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); + .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s))); Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream() - .filter(s -> !s.isMarkedSuspect() && !s.isRepaired()) + .filter(s -> !s.isMarkedSuspect() && !s.isRepaired() && !s.isPendingRepair()) - .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); + .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s))); + Map<Integer, List<SSTableReader>> pendingSSTables = sstables.stream() + .filter(s -> !s.isMarkedSuspect() && s.isPendingRepair()) - .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); ++ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s))); for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet()) ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore)); @@@ -1084,16 -899,14 +1081,16 @@@ readLock.lock(); try { - if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) - { - return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); - } + // to avoid creating a compaction strategy for the wrong pending repair manager, we get the index based on where the sstable is to be written + int index = cfs.getPartitioner().splitter().isPresent() - ? getCompactionStrategyIndex(getDirectories(), descriptor) ++ ? getCompactionStrategyIndex(Arrays.asList(getDirectories().getWriteableLocations()), descriptor) + : 0; + if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR) + return pendingRepairs.get(index).getOrCreate(pendingRepair).createSSTableMultiWriter(descriptor, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, collector, header, indexes, txn); + else if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + return unrepaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn); else - { - return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); - } + return repaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index cd5238f,b1f2e9f..7219595 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -99,9 -98,9 +99,9 @@@ public class Scrubber implements Closea List<SSTableReader> toScrub = Collections.singletonList(sstable); - int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable); + int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable); this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]); - this.isCommutative = cfs.metadata.isCounter(); + this.isCommutative = cfs.metadata().isCounter(); boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists(); this.isIndex = cfs.isIndex(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index e8f7d72,d2f816b..5ddd99c --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@@ -90,9 -88,9 +90,10 @@@ public abstract class CompactionAwareWr maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables); sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge); minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables); + pendingRepair = CompactionTask.getPendingRepair(nonExpiredSSTables); - locations = cfs.getDirectories().getWriteableLocations(); - diskBoundaries = StorageService.getDiskBoundaries(cfs); + DiskBoundaries db = cfs.getDiskBoundaries(); + diskBoundaries = db.positions; + locations = db.directories; locationIndex = -1; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java index 88c60e5,353aacb..f289fe3 --- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@@ -32,8 -33,6 +33,7 @@@ import org.apache.cassandra.db.lifecycl import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.schema.TableId; - import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; public class RangeAwareSSTableWriter implements SSTableMultiWriter @@@ -53,14 -51,14 +53,15 @@@ private final List<SSTableReader> finishedReaders = new ArrayList<>(); private SSTableMultiWriter currentWriter = null; - public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException + public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException { - directories = cfs.getDirectories().getWriteableLocations(); + DiskBoundaries db = cfs.getDiskBoundaries(); + directories = db.directories; this.sstableLevel = sstableLevel; this.cfs = cfs; - this.estimatedKeys = estimatedKeys / directories.length; + this.estimatedKeys = estimatedKeys / directories.size(); this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; this.format = format; this.txn = txn; this.header = header; @@@ -93,8 -91,8 +94,8 @@@ if (currentWriter != null) finishedWriters.add(currentWriter); - Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories[currentIndex]), format); - Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); ++ Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)), format); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java index 0000000,de79959..fc7c9a4 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java +++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java @@@ -1,0 -1,124 +1,124 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db; + + import java.io.File; + import java.net.InetAddress; + import java.net.UnknownHostException; + import java.util.List; + + import com.google.common.collect.Lists; + import org.junit.Assert; + import org.junit.Before; + import org.junit.Test; + + import org.apache.cassandra.cql3.CQLTester; + import org.apache.cassandra.dht.BootStrapper; + import org.apache.cassandra.locator.TokenMetadata; + import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.utils.FBUtilities; + + import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertTrue; + import static org.junit.Assert.fail; + + public class DiskBoundaryManagerTest extends CQLTester + { + private DiskBoundaryManager dbm; + private MockCFS mock; + private Directories dirs; + + @Before + public void setup() + { + BlacklistedDirectories.clearUnwritableUnsafe(); + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress()); + createTable("create table %s (id int primary key, x text)"); + dbm = getCurrentColumnFamilyStore().diskBoundaryManager; - dirs = new Directories(getCurrentColumnFamilyStore().metadata, Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), - new Directories.DataDirectory(new File("/tmp/2")), - new Directories.DataDirectory(new File("/tmp/3")))); ++ dirs = new Directories(getCurrentColumnFamilyStore().metadata.get(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), ++ new Directories.DataDirectory(new File("/tmp/2")), ++ new Directories.DataDirectory(new File("/tmp/3")))); + mock = new MockCFS(getCurrentColumnFamilyStore(), dirs); + } + + @Test + public void getBoundariesTest() + { + DiskBoundaries dbv = dbm.getDiskBoundaries(mock); + Assert.assertEquals(3, dbv.positions.size()); + assertEquals(dbv.directories, dirs.getWriteableLocations()); + } + + @Test + public void blackListTest() + { + DiskBoundaries dbv = dbm.getDiskBoundaries(mock); + Assert.assertEquals(3, dbv.positions.size()); + assertEquals(dbv.directories, dirs.getWriteableLocations()); + BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3")); + dbv = dbm.getDiskBoundaries(mock); + Assert.assertEquals(2, dbv.positions.size()); + Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), + new Directories.DataDirectory(new File("/tmp/2"))), + dbv.directories); + } + + @Test + public void updateTokensTest() throws UnknownHostException + { + DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock); + StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10")); + DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock); + assertFalse(dbv1.equals(dbv2)); + } + + @Test + public void alterKeyspaceTest() throws Throwable + { + DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock); + execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }"); + DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock); + // == on purpose - we just want to make sure that there is a new instance cached + assertFalse(dbv1 == dbv2); + DiskBoundaries dbv3 = dbm.getDiskBoundaries(mock); + assertTrue(dbv2 == dbv3); + + } + + private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2) + { + if (dir1.size() != dir2.length) + fail(); + for (int i = 0; i < dir2.length; i++) + { + if (!dir1.get(i).equals(dir2[i])) + fail(); + } + } + + // just to be able to override the data directories + private static class MockCFS extends ColumnFamilyStore + { + MockCFS(ColumnFamilyStore cfs, Directories dirs) + { + super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
