Merge branch cassandra-3.0 into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/50a9b1ab Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/50a9b1ab Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/50a9b1ab Branch: refs/heads/trunk Commit: 50a9b1abb1a46d264343058837f334d5a73b9bda Parents: c80b9fb 7f668c6 Author: Robert Stupp <sn...@snazy.de> Authored: Mon Dec 12 20:39:37 2016 +0100 Committer: Robert Stupp <sn...@snazy.de> Committed: Mon Dec 12 20:39:41 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../concurrent/NamedThreadFactory.java | 22 +++++++++- .../AbstractCommitLogSegmentManager.java | 3 +- .../db/commitlog/AbstractCommitLogService.java | 3 +- .../cassandra/index/sasi/TermIterator.java | 3 +- .../cassandra/net/OutboundTcpConnection.java | 42 ++++++++++---------- .../apache/cassandra/repair/RepairRunnable.java | 4 +- .../scheduler/RoundRobinScheduler.java | 12 +++--- .../cassandra/service/StorageService.java | 7 ++-- 9 files changed, 58 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index d056492,5bc30be..7413086 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,114 -1,5 +1,115 @@@ -3.0.11 +3.10 + * Remove outboundBindAny configuration property (CASSANDRA-12673) + * Use correct bounds for all-data range when filtering (CASSANDRA-12666) + * Remove timing window in test case (CASSANDRA-12875) + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945) + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919) + * Fix validation of non-frozen UDT cells (CASSANDRA-12916) + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903) + * Fix Murmur3PartitionerTest (CASSANDRA-12858) + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897) + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) + * Fix cassandra-stress truncate option (CASSANDRA-12695) + * Fix crossNode value when receiving messages (CASSANDRA-12791) + * Don't load MX4J beans twice (CASSANDRA-12869) + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838) + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845) + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454) + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) + * Use different build directories for Eclipse and Ant (CASSANDRA-12466) + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815) + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812) + * Upgrade commons-codec to 1.9 (CASSANDRA-12790) + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) + * Add duration data type (CASSANDRA-11873) + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) + * Improve sum aggregate functions (CASSANDRA-12417) + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) + * cqlsh fails to format collections when using aliases (CASSANDRA-11534) + * Check for hash conflicts in prepared statements (CASSANDRA-12733) + * Exit query parsing upon first error (CASSANDRA-12598) + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729) + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450) + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199) + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461) + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) + * Properly report LWT contention (CASSANDRA-12626) +Merged from 3.0: + * Thread local pools never cleaned up (CASSANDRA-13033) * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781) * CQL often queries static columns unnecessarily (CASSANDRA-12768) * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956) http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index 00ddf44,0000000..eff35f4 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@@ -1,550 -1,0 +1,551 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.nicoulaj.compilecommand.annotations.DontInline; ++import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; + +/** + * Performs eager-creation of commit log segments in a background thread. All the + * public methods are thread safe. + */ +public abstract class AbstractCommitLogSegmentManager +{ + static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class); + + /** + * Segment that is ready to be used. The management thread fills this and blocks until consumed. + * + * A single management thread produces this, and consumers are already synchronizing to make sure other work is + * performed atomically with consuming this. Volatile to make sure writes by the management thread become + * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must + * synchronize on 'this'. + */ + private volatile CommitLogSegment availableSegment = null; + + private final WaitQueue segmentPrepared = new WaitQueue(); + + /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */ + private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>(); + + /** + * The segment we are currently allocating commit log records to. + * + * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value. + */ + private volatile CommitLogSegment allocatingFrom = null; + + final String storageDirectory; + + /** + * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size + * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic + * can see the effect of recycling segments immediately (even though they're really happening asynchronously + * on the manager thread, which will take a ms or two). + */ + private final AtomicLong size = new AtomicLong(); + + private Thread managerThread; + protected final CommitLog commitLog; + private volatile boolean shutdown; + + private static final SimpleCachedBufferPool bufferPool = + new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize()); + + AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory) + { + this.commitLog = commitLog; + this.storageDirectory = storageDirectory; + } + + void start() + { + // The run loop for the manager thread + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws Exception + { + while (!shutdown) + { + try + { + assert availableSegment == null; + logger.debug("No segments in reserve; creating a fresh one"); + availableSegment = createSegment(); + if (shutdown) + { + // If shutdown() started and finished during segment creation, we are now left with a + // segment that no one will consume. Discard it. + discardAvailableSegment(); + return; + } + + segmentPrepared.signalAll(); + Thread.yield(); + + if (availableSegment == null && !atSegmentBufferLimit()) + // Writing threads need another segment now. + continue; + + // Writing threads are not waiting for new segments, we can spend time on other tasks. + // flush old Cfs if we're full + maybeFlushToReclaim(); + + LockSupport.park(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) + return; + // sleep some arbitrary period to avoid spamming CL + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + // If we offered a segment, wait for it to be taken before reentering the loop. + // There could be a new segment in next not offered, but only on failure to discard it while + // shutting down-- nothing more can or needs to be done in that case. + } + + while (availableSegment != null || atSegmentBufferLimit() && !shutdown) + LockSupport.park(); + } + } + }; + + shutdown = false; - managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR"); ++ managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR"); + managerThread.start(); + + // for simplicity, ensure the first segment is allocated before continuing + advanceAllocatingFrom(null); + } + + private boolean atSegmentBufferLimit() + { + return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit(); + } + + private void maybeFlushToReclaim() + { + long unused = unusedCapacity(); + if (unused < 0) + { + long flushingSize = 0; + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); + for (CommitLogSegment segment : activeSegments) + { + if (segment == allocatingFrom) + break; + flushingSize += segment.onDiskSize(); + segmentsToRecycle.add(segment); + if (flushingSize + unused >= 0) + break; + } + flushDataFrom(segmentsToRecycle, false); + } + } + + + /** + * Allocate a segment within this CLSM. Should either succeed or throw. + */ + public abstract Allocation allocate(Mutation mutation, int size); + + /** + * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM + * decide what to do with those segments on disk after they've been replayed. + */ + abstract void handleReplayedSegment(final File file); + + /** + * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit + * to segment manager so it's performed on segment management thread. + */ + abstract CommitLogSegment createSegment(); + + /** + * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment + * manager so it's performend on segment management thread, or perform while segment management thread is shutdown + * during testing resets. + * + * @param segment segment to be discarded + * @param delete whether or not the segment is safe to be deleted. + */ + abstract void discard(CommitLogSegment segment, boolean delete); + + /** + * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided. + * + * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM. + */ + @DontInline + void advanceAllocatingFrom(CommitLogSegment old) + { + while (true) + { + synchronized (this) + { + // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments + if (allocatingFrom != old) + return; + + // If a segment is ready, take it now, otherwise wait for the management thread to construct it. + if (availableSegment != null) + { + // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving + // the critical section. + activeSegments.add(allocatingFrom = availableSegment); + availableSegment = null; + break; + } + } + + awaitAvailableSegment(old); + } + + // Signal the management thread to prepare a new segment. + wakeManager(); + + if (old != null) + { + // Now we can run the user defined command just after switching to the new commit log. + // (Do this here instead of in the recycle call so we can get a head start on the archive.) + commitLog.archiver.maybeArchive(old); + + // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it + old.discardUnusedTail(); + } + + // request that the CL be synced out-of-band, as we've finished a segment + commitLog.requestExtraSync(); + } + + void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom) + { + do + { + WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time()); + if (availableSegment == null && allocatingFrom == currentAllocatingFrom) + prepared.awaitUninterruptibly(); + else + prepared.cancel(); + } + while (availableSegment == null && allocatingFrom == currentAllocatingFrom); + } + + /** + * Switch to a new segment, regardless of how much is left in the current one. + * + * Flushes any dirty CFs for this segment and any older segments, and then discards the segments + */ + void forceRecycleAll(Iterable<UUID> droppedCfs) + { + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments); + CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); + advanceAllocatingFrom(last); + + // wait for the commit log modifications + last.waitForModifications(); + + // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes + // to complete + Keyspace.writeOrder.awaitNewBarrier(); + + // flush and wait for all CFs that are dirty in segments up-to and including 'last' + Future<?> future = flushDataFrom(segmentsToRecycle, true); + try + { + future.get(); + + for (CommitLogSegment segment : activeSegments) + for (UUID cfId : droppedCfs) + segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + + // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() + // if the previous active segment was the only one to recycle (since an active segment isn't + // necessarily dirty, and we only call dCS after a flush). + for (CommitLogSegment segment : activeSegments) + { + if (segment.isUnused()) + archiveAndDiscard(segment); + } + + CommitLogSegment first; + if ((first = activeSegments.peek()) != null && first.id <= last.id) + logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs."); + } + catch (Throwable t) + { + // for now just log the error + logger.error("Failed waiting for a forced recycle of in-use commit log segments", t); + } + } + + /** + * Indicates that a segment is no longer in use and that it should be discarded. + * + * @param segment segment that is no longer in use + */ + void archiveAndDiscard(final CommitLogSegment segment) + { + boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName()); + if (!activeSegments.remove(segment)) + return; // already discarded + // if archiving (command) was not successful then leave the file alone. don't delete or recycle. + logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script"); + discard(segment, archiveSuccess); + } + + /** + * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards. + * @param addedSize + */ + void addSize(long addedSize) + { + size.addAndGet(addedSize); + } + + /** + * @return the space (in bytes) used by all segment files. + */ + public long onDiskSize() + { + return size.get(); + } + + private long unusedCapacity() + { + long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024; + long currentSize = size.get(); + logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total); + return total - currentSize; + } + + /** + * Force a flush on all CFs that are still dirty in @param segments. + * + * @return a Future that will finish when all the flushes are complete. + */ + private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) + { + if (segments.isEmpty()) + return Futures.immediateFuture(null); + final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition(); + + // a map of CfId -> forceFlush() to ensure we only queue one flush per cf + final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>(); + + for (CommitLogSegment segment : segments) + { + for (UUID dirtyCFId : segment.getDirtyCFIDs()) + { + Pair<String,String> pair = Schema.instance.getCF(dirtyCFId); + if (pair == null) + { + // even though we remove the schema entry before a final flush when dropping a CF, + // it's still possible for a writer to race and finish his append after the flush. + logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); + segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + } + else if (!flushes.containsKey(dirtyCFId)) + { + String keyspace = pair.left; + final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); + // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, + // no deadlock possibility since switchLock removal + flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)); + } + } + } + + return Futures.allAsList(flushes.values()); + } + + /** + * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. + * Only call this after the AbstractCommitLogService is shut down. + */ + public void stopUnsafe(boolean deleteSegments) + { + logger.debug("CLSM closing and clearing existing commit log segments..."); + + shutdown(); + try + { + awaitTermination(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + for (CommitLogSegment segment : activeSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + activeSegments.clear(); + + size.set(0L); + + logger.trace("CLSM done with closing and clearing existing commit log segments."); + } + + /** + * To be used by tests only. Not safe if mutation slots are being allocated concurrently. + */ + void awaitManagementTasksCompletion() + { + if (availableSegment == null && !atSegmentBufferLimit()) + { + awaitAvailableSegment(allocatingFrom); + } + } + + /** + * Explicitly for use only during resets in unit testing. + */ + private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) + { + try + { + discard(segment, delete); + } + catch (AssertionError ignored) + { + // segment file does not exist + } + } + + /** + * Initiates the shutdown process for the management thread. + */ + public void shutdown() + { + assert !shutdown; + shutdown = true; + + // Release the management thread and delete prepared segment. + // Do not block as another thread may claim the segment (this can happen during unit test initialization). + discardAvailableSegment(); + wakeManager(); + } + + private void discardAvailableSegment() + { + CommitLogSegment next = null; + synchronized (this) + { + next = availableSegment; + availableSegment = null; + } + if (next != null) + next.discard(true); + } + + /** + * Returns when the management thread terminates. + */ + public void awaitTermination() throws InterruptedException + { + managerThread.join(); + managerThread = null; + + for (CommitLogSegment segment : activeSegments) + segment.close(); + + bufferPool.shutdown(); + } + + /** + * @return a read-only collection of the active commit log segments + */ + @VisibleForTesting + public Collection<CommitLogSegment> getActiveSegments() + { + return Collections.unmodifiableCollection(activeSegments); + } + + /** + * @return the current CommitLogPosition of the active segment we're allocating from + */ + CommitLogPosition getCurrentPosition() + { + return allocatingFrom.getCurrentCommitLogPosition(); + } + + /** + * Forces a disk flush on the commit log files that need it. Blocking. + */ + public void sync() throws IOException + { + CommitLogSegment current = allocatingFrom; + for (CommitLogSegment segment : getActiveSegments()) + { + // Do not sync segments that became active after sync started. + if (segment.id > current.id) + return; + segment.sync(); + } + } + + /** + * Used by compressed and encrypted segments to share a buffer pool across the CLSM. + */ + SimpleCachedBufferPool getBufferPool() + { + return bufferPool; + } + + void wakeManager() + { + LockSupport.unpark(managerThread); + } + + /** + * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for + * a buffer to become available. + */ + void notifyBufferFreed() + { + wakeManager(); + } + + /** Read-only access to current segment for subclasses. */ + CommitLogSegment allocatingFrom() + { + return allocatingFrom; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 7b56da3,e5a5887..834aa0d --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@@ -17,18 -17,16 +17,19 @@@ */ package org.apache.cassandra.db.commitlog; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.utils.NoSpamLogger; -import org.apache.cassandra.utils.concurrent.WaitQueue; -import org.slf4j.*; - -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; -import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Timer.Context; + ++import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; +import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.concurrent.WaitQueue; public abstract class AbstractCommitLogService { @@@ -148,8 -160,7 +149,8 @@@ } }; + shutdown = false; - thread = new Thread(runnable, name); + thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name); thread.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/index/sasi/TermIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/sasi/TermIterator.java index 5b08a56,0000000..1ddfcb9 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java +++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java @@@ -1,218 -1,0 +1,219 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + ++import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.io.util.FileUtils; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TermIterator extends RangeIterator<Long, Token> +{ + private static final Logger logger = LoggerFactory.getLogger(TermIterator.class); + + private static final ThreadLocal<ExecutorService> SEARCH_EXECUTOR = new ThreadLocal<ExecutorService>() + { + public ExecutorService initialValue() + { + final String currentThread = Thread.currentThread().getName(); + final int concurrencyFactor = DatabaseDescriptor.searchConcurrencyFactor(); + + logger.info("Search Concurrency Factor is set to {} for {}", concurrencyFactor, currentThread); + + return (concurrencyFactor <= 1) + ? MoreExecutors.newDirectExecutorService() + : Executors.newFixedThreadPool(concurrencyFactor, new ThreadFactory() + { + public final AtomicInteger count = new AtomicInteger(); + + public Thread newThread(Runnable task) + { - return new Thread(task, currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }}; ++ return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }}; + } + }); + } + }; + + private final Expression expression; + + private final RangeIterator<Long, Token> union; + private final Set<SSTableIndex> referencedIndexes; + + private TermIterator(Expression e, + RangeIterator<Long, Token> union, + Set<SSTableIndex> referencedIndexes) + { + super(union.getMinimum(), union.getMaximum(), union.getCount()); + + this.expression = e; + this.union = union; + this.referencedIndexes = referencedIndexes; + } + + @SuppressWarnings("resource") + public static TermIterator build(final Expression e, Set<SSTableIndex> perSSTableIndexes) + { + final List<RangeIterator<Long, Token>> tokens = new CopyOnWriteArrayList<>(); + final AtomicLong tokenCount = new AtomicLong(0); + + RangeIterator<Long, Token> memtableIterator = e.index.searchMemtable(e); + if (memtableIterator != null) + { + tokens.add(memtableIterator); + tokenCount.addAndGet(memtableIterator.getCount()); + } + + final Set<SSTableIndex> referencedIndexes = new CopyOnWriteArraySet<>(); + + try + { + final CountDownLatch latch = new CountDownLatch(perSSTableIndexes.size()); + final ExecutorService searchExecutor = SEARCH_EXECUTOR.get(); + + for (final SSTableIndex index : perSSTableIndexes) + { + if (e.getOp() == Expression.Op.PREFIX && + index.mode() == OnDiskIndexBuilder.Mode.CONTAINS && !index.hasMarkedPartials()) + throw new UnsupportedOperationException(String.format("The index %s has not yet been upgraded " + + "to support prefix queries in CONTAINS mode. " + + "Wait for compaction or rebuild the index.", + index.getPath())); + + + if (!index.reference()) + { + latch.countDown(); + continue; + } + + // add to referenced right after the reference was acquired, + // that helps to release index if something goes bad inside of the search + referencedIndexes.add(index); + + searchExecutor.submit((Runnable) () -> { + try + { + e.checkpoint(); + + RangeIterator<Long, Token> keyIterator = index.search(e); + if (keyIterator == null) + { + releaseIndex(referencedIndexes, index); + return; + } + + tokens.add(keyIterator); + tokenCount.getAndAdd(keyIterator.getCount()); + } + catch (Throwable e1) + { + releaseIndex(referencedIndexes, index); + + if (logger.isDebugEnabled()) + logger.debug(String.format("Failed search an index %s, skipping.", index.getPath()), e1); + } + finally + { + latch.countDown(); + } + }); + } + + Uninterruptibles.awaitUninterruptibly(latch); + + // checkpoint right away after all indexes complete search because we might have crossed the quota + e.checkpoint(); + + RangeIterator<Long, Token> ranges = RangeUnionIterator.build(tokens); + return ranges == null ? null : new TermIterator(e, ranges, referencedIndexes); + } + catch (Throwable ex) + { + // if execution quota was exceeded while opening indexes or something else happened + // local (yet to be tracked) indexes should be released first before re-throwing exception + referencedIndexes.forEach(TermIterator::releaseQuietly); + + throw ex; + } + } + + protected Token computeNext() + { + try + { + return union.hasNext() ? union.next() : endOfData(); + } + finally + { + expression.checkpoint(); + } + } + + protected void performSkipTo(Long nextToken) + { + try + { + union.skipTo(nextToken); + } + finally + { + expression.checkpoint(); + } + } + + public void close() + { + FileUtils.closeQuietly(union); + referencedIndexes.forEach(TermIterator::releaseQuietly); + referencedIndexes.clear(); + } + + private static void releaseIndex(Set<SSTableIndex> indexes, SSTableIndex index) + { + indexes.remove(index); + releaseQuietly(index); + } + + private static void releaseQuietly(SSTableIndex index) + { + try + { + index.release(); + } + catch (Throwable e) + { + logger.error(String.format("Failed to release index %s", index.getPath()), e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 1f47334,a9dfcdc..1843e7b --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@@ -502,31 -506,27 +503,28 @@@ public class OutboundTcpConnection exte { final AtomicInteger version = new AtomicInteger(NO_VERSION); final CountDownLatch versionLatch = new CountDownLatch(1); - new Thread("HANDSHAKE-" + poolReference.endPoint()) - new Thread(NamedThreadFactory.threadLocalDeallocator(() -> ++ Runnable r = () -> { - @Override - public void run() + try { - try - { - logger.info("Handshaking version with {}", poolReference.endPoint()); - version.set(inputStream.readInt()); - } - catch (IOException ex) - { - final String msg = "Cannot handshake version with " + poolReference.endPoint(); - if (logger.isTraceEnabled()) - logger.trace(msg, ex); - else - logger.info(msg); - } - finally - { - //unblock the waiting thread on either success or fail - versionLatch.countDown(); - } + logger.info("Handshaking version with {}", poolReference.endPoint()); + version.set(inputStream.readInt()); + } + catch (IOException ex) + { + final String msg = "Cannot handshake version with " + poolReference.endPoint(); + if (logger.isTraceEnabled()) + logger.trace(msg, ex); + else + logger.info(msg); + } + finally + { + //unblock the waiting thread on either success or fail + versionLatch.countDown(); } - }.start(); - }),"HANDSHAKE-" + poolReference.endPoint()).start(); ++ }; ++ new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start(); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java index c98c0fe,61dfa50..904deb3 --- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java +++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java @@@ -59,17 -60,17 +60,14 @@@ public class RoundRobinScheduler implem taskCount = new Semaphore(options.throttle_limit - 1); queues = new NonBlockingHashMap<String, WeightedQueue>(); -- Runnable runnable = new Runnable() ++ Runnable runnable = () -> { -- public void run() ++ while (true) { -- while (true) -- { -- schedule(); -- } ++ schedule(); } }; - Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER"); + Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER"); scheduler.start(); logger.info("Started the RoundRobin Request Scheduler"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 3f999c2,71cbc35..1247e03 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -631,8 -592,29 +632,8 @@@ public class StorageService extends Not throw new AssertionError(e); } - if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) - { - logger.info("Loading persisted ring state"); - Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens(); - Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds(); - for (InetAddress ep : loadedTokens.keySet()) - { - if (ep.equals(FBUtilities.getBroadcastAddress())) - { - // entry has been mistakenly added, delete it - SystemKeyspace.removeEndpoint(ep); - } - else - { - if (loadedHostIds.containsKey(ep)) - tokenMetadata.updateHostId(loadedHostIds.get(ep), ep); - Gossiper.instance.addSavedEndpoint(ep); - } - } - } - // daemon threads, like our executors', continue to run while shutdown hooks are invoked - drainOnShutdown = new Thread(new WrappedRunnable() + drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable() { @Override public void runMayThrow() throws InterruptedException, ExecutionException, IOException @@@ -647,10 -629,10 +648,10 @@@ logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory()); logbackHook.run(); } - }, "StorageServiceShutdownHook"); + }), "StorageServiceShutdownHook"); Runtime.getRuntime().addShutdownHook(drainOnShutdown); - replacing = DatabaseDescriptor.isReplacing(); + replacing = isReplacing(); if (!Boolean.parseBoolean(System.getProperty("cassandra.start_gossip", "true"))) {