Repository: cassandra Updated Branches: refs/heads/trunk 058faff6f -> e8907c16a
Simplify commit log code patch by Branimir Lambov; reviewed by Ariel Weisberg for CASSANDRA-10202 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8907c16 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8907c16 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8907c16 Branch: refs/heads/trunk Commit: e8907c16abcd84021a39cdaac79b609fcc64a43c Parents: 058faff Author: Branimir Lambov <[email protected]> Authored: Tue May 3 17:36:25 2016 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Jul 20 15:59:38 2016 +0100 ---------------------------------------------------------------------- .../cassandra/config/DatabaseDescriptor.java | 5 + .../AbstractCommitLogSegmentManager.java | 405 +++++++++---------- .../db/commitlog/AbstractCommitLogService.java | 132 +++--- .../db/commitlog/BatchCommitLogService.java | 2 +- .../cassandra/db/commitlog/CommitLog.java | 47 +-- .../db/commitlog/CommitLogArchiver.java | 25 +- .../db/commitlog/CommitLogSegment.java | 25 +- .../commitlog/CommitLogSegmentManagerCDC.java | 13 +- .../CommitLogSegmentManagerStandard.java | 13 +- .../db/commitlog/CompressedSegment.java | 4 +- .../db/commitlog/EncryptedSegment.java | 4 +- .../db/commitlog/FileDirectSegment.java | 6 +- .../db/commitlog/PeriodicCommitLogService.java | 25 +- .../db/commitlog/CommitLogStressTest.java | 27 +- test/unit/org/apache/cassandra/Util.java | 2 +- .../CommitLogSegmentManagerCDCTest.java | 9 +- .../commitlog/CommitLogSegmentManagerTest.java | 10 +- .../cassandra/db/commitlog/CommitLogTest.java | 10 +- .../db/commitlog/CommitLogTestReplayer.java | 2 +- 19 files changed, 352 insertions(+), 414 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 374c64d..4f188bb 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1442,6 +1442,11 @@ public class DatabaseDescriptor return conf.commitlog_max_compression_buffers_in_pool; } + public static void setCommitLogMaxCompressionBuffersPerPool(int buffers) + { + conf.commitlog_max_compression_buffers_in_pool = buffers; + } + public static int getMaxMutationSize() { return conf.max_mutation_size_in_kb * 1024; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index 7ea7439..275d5b3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -22,16 +22,18 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import net.nicoulaj.compilecommand.annotations.DontInline; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -45,19 +47,27 @@ public abstract class AbstractCommitLogSegmentManager { static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class); - // Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation. - private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>(); + /** + * Segment that is ready to be used. The management thread fills this and blocks until consumed. + * + * A single management thread produces this, and consumers are already synchronizing to make sure other work is + * performed atomically with consuming this. Volatile to make sure writes by the management thread become + * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must + * synchronize on 'this'. + */ + private volatile CommitLogSegment availableSegment = null; - /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */ - private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>(); + private final WaitQueue segmentPrepared = new WaitQueue(); - /** Active segments, containing unflushed data */ + /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>(); - /** The segment we are currently allocating commit log records to */ - protected volatile CommitLogSegment allocatingFrom = null; - - private final WaitQueue hasAvailableSegments = new WaitQueue(); + /** + * The segment we are currently allocating commit log records to. + * + * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value. + */ + private volatile CommitLogSegment allocatingFrom = null; final String storageDirectory; @@ -69,15 +79,9 @@ public abstract class AbstractCommitLogSegmentManager */ private final AtomicLong size = new AtomicLong(); - /** - * New segment creation is initially disabled because we'll typically get some "free" segments - * recycled after log replay. - */ - volatile boolean createReserveSegments = false; - private Thread managerThread; - protected volatile boolean run = true; protected final CommitLog commitLog; + private volatile boolean shutdown; private static final SimpleCachedBufferPool bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize()); @@ -95,54 +99,33 @@ public abstract class AbstractCommitLogSegmentManager { public void runMayThrow() throws Exception { - while (run) + while (!shutdown) { try { - Runnable task = segmentManagementTasks.poll(); - if (task == null) + assert availableSegment == null; + logger.debug("No segments in reserve; creating a fresh one"); + availableSegment = createSegment(); + if (shutdown) { - // if we have no more work to do, check if we should create a new segment - if (!atSegmentLimit() && - availableSegments.isEmpty() && - (activeSegments.isEmpty() || createReserveSegments)) - { - logger.trace("No segments in reserve; creating a fresh one"); - // TODO : some error handling in case we fail to create a new segment - availableSegments.add(createSegment()); - hasAvailableSegments.signalAll(); - } - - // flush old Cfs if we're full - long unused = unusedCapacity(); - if (unused < 0) - { - List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); - long spaceToReclaim = 0; - for (CommitLogSegment segment : activeSegments) - { - if (segment == allocatingFrom) - break; - segmentsToRecycle.add(segment); - spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize(); - if (spaceToReclaim + unused >= 0) - break; - } - flushDataFrom(segmentsToRecycle, false); - } - - // Since we're operating on a "null" allocation task, block here for the next task on the - // queue rather than looping, grabbing another null, and repeating the above work. - try - { - task = segmentManagementTasks.take(); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } + // If shutdown() started and finished during segment creation, we are now left with a + // segment that no one will consume. Discard it. + discardAvailableSegment(); + return; } - task.run(); + + segmentPrepared.signalAll(); + Thread.yield(); + + if (availableSegment == null && !atSegmentBufferLimit()) + // Writing threads need another segment now. + continue; + + // Writing threads are not waiting for new segments, we can spend time on other tasks. + // flush old Cfs if we're full + maybeFlushToReclaim(); + + LockSupport.park(); } catch (Throwable t) { @@ -151,27 +134,51 @@ public abstract class AbstractCommitLogSegmentManager return; // sleep some arbitrary period to avoid spamming CL Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + // If we offered a segment, wait for it to be taken before reentering the loop. + // There could be a new segment in next not offered, but only on failure to discard it while + // shutting down-- nothing more can or needs to be done in that case. } - } - } - private boolean atSegmentLimit() - { - return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit(); + while (availableSegment != null || atSegmentBufferLimit() && !shutdown) + LockSupport.park(); + } } }; - run = true; - + shutdown = false; managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR"); managerThread.start(); + + // for simplicity, ensure the first segment is allocated before continuing + advanceAllocatingFrom(null); } + private boolean atSegmentBufferLimit() + { + return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit(); + } + + private void maybeFlushToReclaim() + { + long unused = unusedCapacity(); + if (unused < 0) + { + long flushingSize = 0; + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); + for (CommitLogSegment segment : activeSegments) + { + if (segment == allocatingFrom) + break; + flushingSize += segment.onDiskSize(); + segmentsToRecycle.add(segment); + if (flushingSize + unused >= 0) + break; + } + flushDataFrom(segmentsToRecycle, false); + } + } - /** - * Shut down the CLSM. Used both during testing and during regular shutdown, so needs to stop everything. - */ - public abstract void shutdown(); /** * Allocate a segment within this CLSM. Should either succeed or throw. @@ -200,102 +207,69 @@ public abstract class AbstractCommitLogSegmentManager */ abstract void discard(CommitLogSegment segment, boolean delete); - /** - * Grab the current CommitLogSegment we're allocating from. Also serves as a utility method to block while the allocator - * is working on initial allocation of a CommitLogSegment. - */ - CommitLogSegment allocatingFrom() - { - CommitLogSegment r = allocatingFrom; - if (r == null) - { - advanceAllocatingFrom(null); - r = allocatingFrom; - } - return r; - } - - /** - * Fetches a new segment from the queue, signaling the management thread to create a new one if necessary, and "activates" it. - * Blocks until a new segment is allocated and the thread requesting an advanceAllocatingFrom is signalled. + * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided. * * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM. */ - protected void advanceAllocatingFrom(CommitLogSegment old) + @DontInline + void advanceAllocatingFrom(CommitLogSegment old) { - while (true) - { - CommitLogSegment next; + while (true) { synchronized (this) { - // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments - // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432 + // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments if (allocatingFrom != old) return; - next = availableSegments.poll(); - if (next != null) - { - allocatingFrom = next; - activeSegments.add(next); - } - } - if (next != null) - { - if (old != null) + // If a segment is ready, take it now, otherwise wait for the management thread to construct it. + if (availableSegment != null) { - // Now we can run the user defined command just after switching to the new commit log. - // (Do this here instead of in the recycle call so we can get a head start on the archive.) - commitLog.archiver.maybeArchive(old); - - // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it - old.discardUnusedTail(); + // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving + // the critical section. + activeSegments.add(allocatingFrom = availableSegment); + availableSegment = null; + break; } - - // request that the CL be synced out-of-band, as we've finished a segment - commitLog.requestExtraSync(); - return; } - // no more segments, so register to receive a signal when not empty - WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time()); + awaitAvailableSegment(old); + } - // trigger the management thread; this must occur after registering - // the signal to ensure we are woken by any new segment creation - wakeManager(); + // Signal the management thread to prepare a new segment. + wakeManager(); - // check if the queue has already been added to before waiting on the signal, to catch modifications - // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change - if (!availableSegments.isEmpty() || allocatingFrom != old) - { - signal.cancel(); - // if we've been beaten, just stop immediately - if (allocatingFrom != old) - return; - // otherwise try again, as there should be an available segment - continue; - } + if (old != null) + { + // Now we can run the user defined command just after switching to the new commit log. + // (Do this here instead of in the recycle call so we can get a head start on the archive.) + commitLog.archiver.maybeArchive(old); - // can only reach here if the queue hasn't been inserted into - // before we registered the signal, as we only remove items from the queue - // after updating allocatingFrom. Can safely block until we are signalled - // by the allocator that new segments have been published - signal.awaitUninterruptibly(); + // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it + old.discardUnusedTail(); } + + // request that the CL be synced out-of-band, as we've finished a segment + commitLog.requestExtraSync(); } - protected void wakeManager() + void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom) { - // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary) - segmentManagementTasks.add(Runnables.doNothing()); + do + { + WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time()); + if (availableSegment == null && allocatingFrom == currentAllocatingFrom) + prepared.awaitUninterruptibly(); + else + prepared.cancel(); + } + while (availableSegment == null && allocatingFrom == currentAllocatingFrom); } /** * Switch to a new segment, regardless of how much is left in the current one. * - * Flushes any dirty CFs for this segment and any older segments, and then recycles - * the segments + * Flushes any dirty CFs for this segment and any older segments, and then discards the segments */ void forceRecycleAll(Iterable<UUID> droppedCfs) { @@ -307,7 +281,7 @@ public abstract class AbstractCommitLogSegmentManager last.waitForModifications(); // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes - // on the relevant keyspaces to complete + // to complete Keyspace.writeOrder.awaitNewBarrier(); // flush and wait for all CFs that are dirty in segments up-to and including 'last' @@ -326,7 +300,7 @@ public abstract class AbstractCommitLogSegmentManager for (CommitLogSegment segment : activeSegments) { if (segment.isUnused()) - recycleSegment(segment); + archiveAndDiscard(segment); } CommitLogSegment first; @@ -341,33 +315,18 @@ public abstract class AbstractCommitLogSegmentManager } /** - * Indicates that a segment is no longer in use and that it should be recycled. + * Indicates that a segment is no longer in use and that it should be discarded. * * @param segment segment that is no longer in use */ - void recycleSegment(final CommitLogSegment segment) + void archiveAndDiscard(final CommitLogSegment segment) { boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName()); - if (activeSegments.remove(segment)) - { - // if archiving (command) was not successful then leave the file alone. don't delete or recycle. - discardSegment(segment, archiveSuccess); - } - else - { - logger.warn("segment {} not found in activeSegments queue", segment); - } - } - - /** - * Indicates that a segment file should be deleted. - * - * @param segment segment to be discarded - */ - private void discardSegment(final CommitLogSegment segment, final boolean deleteFile) - { - logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script"); - segmentManagementTasks.add(() -> discard(segment, deleteFile)); + if (!activeSegments.remove(segment)) + return; // already discarded + // if archiving (command) was not successful then leave the file alone. don't delete or recycle. + logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script"); + discard(segment, archiveSuccess); } /** @@ -396,28 +355,6 @@ public abstract class AbstractCommitLogSegmentManager } /** - * @param name the filename to check - * @return true if file is managed by this manager. - */ - public boolean manages(String name) - { - for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments)) - if (segment.getName().equals(name)) - return true; - return false; - } - - /** - * Throws a flag that enables the behavior of keeping at least one spare segment - * available at all times. - */ - void enableReserveSegmentCreation() - { - createReserveSegments = true; - wakeManager(); - } - - /** * Force a flush on all CFs that are still dirty in @param segments. * * @return a Future that will finish when all the flushes are complete. @@ -463,10 +400,7 @@ public abstract class AbstractCommitLogSegmentManager */ public void stopUnsafe(boolean deleteSegments) { - logger.trace("CLSM closing and clearing existing commit log segments..."); - createReserveSegments = false; - - awaitManagementTasksCompletion(); + logger.debug("CLSM closing and clearing existing commit log segments..."); shutdown(); try @@ -478,35 +412,24 @@ public abstract class AbstractCommitLogSegmentManager throw new RuntimeException(e); } - synchronized (this) - { - for (CommitLogSegment segment : activeSegments) - closeAndDeleteSegmentUnsafe(segment, deleteSegments); - activeSegments.clear(); - - for (CommitLogSegment segment : availableSegments) - closeAndDeleteSegmentUnsafe(segment, deleteSegments); - availableSegments.clear(); - } - - allocatingFrom = null; - - segmentManagementTasks.clear(); + for (CommitLogSegment segment : activeSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + activeSegments.clear(); size.set(0L); logger.trace("CLSM done with closing and clearing existing commit log segments."); } - // Used by tests only. + /** + * To be used by tests only. Not safe if mutation slots are being allocated concurrently. + */ void awaitManagementTasksCompletion() { - while (!segmentManagementTasks.isEmpty()) - Thread.yield(); - // The last management task is not yet complete. Wait a while for it. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure - // waiting completes correctly. + if (availableSegment == null && !atSegmentBufferLimit()) + { + awaitAvailableSegment(allocatingFrom); + } } /** @@ -525,18 +448,41 @@ public abstract class AbstractCommitLogSegmentManager } /** + * Initiates the shutdown process for the management thread. + */ + public void shutdown() + { + assert !shutdown; + shutdown = true; + + // Release the management thread and delete prepared segment. + // Do not block as another thread may claim the segment (this can happen during unit test initialization). + discardAvailableSegment(); + wakeManager(); + } + + private void discardAvailableSegment() + { + CommitLogSegment next = null; + synchronized (this) { + next = availableSegment; + availableSegment = null; + } + if (next != null) + next.discard(true); + } + + /** * Returns when the management thread terminates. */ public void awaitTermination() throws InterruptedException { managerThread.join(); + managerThread = null; for (CommitLogSegment segment : activeSegments) segment.close(); - for (CommitLogSegment segment : availableSegments) - segment.close(); - bufferPool.shutdown(); } @@ -554,18 +500,19 @@ public abstract class AbstractCommitLogSegmentManager */ CommitLogPosition getCurrentPosition() { - return allocatingFrom().getCurrentCommitLogPosition(); + return allocatingFrom.getCurrentCommitLogPosition(); } /** * Forces a disk flush on the commit log files that need it. Blocking. */ - public void sync(boolean syncAllSegments) throws IOException + public void sync() throws IOException { - CommitLogSegment current = allocatingFrom(); + CommitLogSegment current = allocatingFrom; for (CommitLogSegment segment : getActiveSegments()) { - if (!syncAllSegments && segment.id > current.id) + // Do not sync segments that became active after sync started. + if (segment.id > current.id) return; segment.sync(); } @@ -578,5 +525,25 @@ public abstract class AbstractCommitLogSegmentManager { return bufferPool; } + + void wakeManager() + { + LockSupport.unpark(managerThread); + } + + /** + * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for + * a buffer to become available. + */ + void notifyBufferFreed() + { + wakeManager(); + } + + /** Read-only access to current segment for subclasses. */ + CommitLogSegment allocatingFrom() + { + return allocatingFrom; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 0ba4f55..7b56da3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -17,15 +17,18 @@ */ package org.apache.cassandra.db.commitlog; -import org.apache.cassandra.utils.NoSpamLogger; -import org.apache.cassandra.utils.concurrent.WaitQueue; -import org.slf4j.*; - -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; +import com.codahale.metrics.Timer.Context; + +import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; +import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.concurrent.WaitQueue; public abstract class AbstractCommitLogService { @@ -41,11 +44,10 @@ public abstract class AbstractCommitLogService // signal that writers can wait on to be notified of a completed sync protected final WaitQueue syncComplete = new WaitQueue(); - protected final Semaphore haveWork = new Semaphore(1); final CommitLog commitLog; private final String name; - private final long pollIntervalMillis; + private final long pollIntervalNanos; private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class); @@ -59,14 +61,15 @@ public abstract class AbstractCommitLogService { this.commitLog = commitLog; this.name = name; - this.pollIntervalMillis = pollIntervalMillis; + this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS); } // Separated into individual method to ensure relevant objects are constructed before this is started. void start() { - if (pollIntervalMillis < 1) - throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis)); + if (pollIntervalNanos < 1) + throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", + pollIntervalNanos * 1e-6)); Runnable runnable = new Runnable() { @@ -78,26 +81,25 @@ public abstract class AbstractCommitLogService int lagCount = 0; int syncCount = 0; - boolean run = true; - while (run) + while (true) { + // always run once after shutdown signalled + boolean shutdownRequested = shutdown; + try { - // always run once after shutdown signalled - run = !shutdown; - // sync and signal - long syncStarted = System.currentTimeMillis(); + long syncStarted = System.nanoTime(); // This is a target for Byteman in CommitLogSegmentManagerTest - commitLog.sync(shutdown); + commitLog.sync(); lastSyncedAt = syncStarted; syncComplete.signalAll(); // sleep any time we have left before the next one is due - long now = System.currentTimeMillis(); - long sleep = syncStarted + pollIntervalMillis - now; - if (sleep < 0) + long now = System.nanoTime(); + long wakeUpAt = syncStarted + pollIntervalNanos; + if (wakeUpAt < now) { // if we have lagged noticeably, update our lag counter if (firstLagAt == 0) @@ -105,7 +107,7 @@ public abstract class AbstractCommitLogService firstLagAt = now; totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; } - syncExceededIntervalBy -= sleep; + syncExceededIntervalBy += now - wakeUpAt; lagCount++; } syncCount++; @@ -114,30 +116,25 @@ public abstract class AbstractCommitLogService if (firstLagAt > 0) { //Only reset the lag tracking if it actually logged this time - boolean logged = NoSpamLogger.log( - logger, - NoSpamLogger.Level.WARN, - 5, - TimeUnit.MINUTES, - "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", - syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount)); + boolean logged = NoSpamLogger.log(logger, + NoSpamLogger.Level.WARN, + 5, + TimeUnit.MINUTES, + "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", + syncCount, + String.format("%.2f", (now - firstLagAt) * 1e-9d), + String.format("%.2f", totalSyncDuration * 1e-6d / syncCount), + lagCount, + String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount)); if (logged) firstLagAt = 0; } - // if we have lagged this round, we probably have work to do already so we don't sleep - if (sleep < 0 || !run) - continue; + if (shutdownRequested) + return; - try - { - haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS); - haveWork.drainPermits(); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } + if (wakeUpAt > now) + LockSupport.parkNanos(wakeUpAt - now); } catch (Throwable t) { @@ -145,19 +142,13 @@ public abstract class AbstractCommitLogService break; // sleep for full poll-interval after an error, so we don't spam the log file - try - { - haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } + LockSupport.parkNanos(pollIntervalNanos); } } } }; + shutdown = false; thread = new Thread(runnable, name); thread.start(); } @@ -174,42 +165,43 @@ public abstract class AbstractCommitLogService protected abstract void maybeWaitForSync(Allocation alloc); /** - * Sync immediately, but don't block for the sync to cmplete + * Request an additional sync cycle without blocking. */ - public WaitQueue.Signal requestExtraSync() + public void requestExtraSync() { - WaitQueue.Signal signal = syncComplete.register(); - haveWork.release(1); - return signal; + LockSupport.unpark(thread); } public void shutdown() { shutdown = true; - haveWork.release(1); + requestExtraSync(); } /** - * FOR TESTING ONLY + * Request sync and wait until the current state is synced. + * + * Note: If a sync is in progress at the time of this request, the call will return after both it and a cycle + * initiated immediately afterwards complete. */ - public void restartUnsafe() + public void syncBlocking() { - while (haveWork.availablePermits() < 1) - haveWork.release(); + long requestTime = System.nanoTime(); + requestExtraSync(); + awaitSyncAt(requestTime, null); + } - while (haveWork.availablePermits() > 1) + void awaitSyncAt(long syncTime, Context context) + { + do { - try - { - haveWork.acquire(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + WaitQueue.Signal signal = context != null ? syncComplete.register(context) : syncComplete.register(); + if (lastSyncedAt < syncTime) + signal.awaitUninterruptibly(); + else + signal.cancel(); } - shutdown = false; - start(); + while (lastSyncedAt < syncTime); } public void awaitTermination() throws InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java index ceb5d64..4edfa34 100644 --- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java @@ -30,7 +30,7 @@ class BatchCommitLogService extends AbstractCommitLogService { // wait until record has been safely persisted to disk pending.incrementAndGet(); - haveWork.release(); + requestExtraSync(); alloc.awaitDiskSync(commitLog.metrics.waitingOnCommit); pending.decrementAndGet(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index b66221c..d76b9cb 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -116,8 +116,8 @@ public class CommitLog implements CommitLogMBean CommitLog start() { - executor.start(); segmentManager.start(); + executor.start(); return this; } @@ -129,22 +129,11 @@ public class CommitLog implements CommitLogMBean */ public int recoverSegmentsOnDisk() throws IOException { - // If createReserveSegments is already flipped, the CLSM is running and recovery has already taken place. - if (segmentManager.createReserveSegments) - return 0; - - FilenameFilter unmanagedFilesFilter = new FilenameFilter() - { - public boolean accept(File dir, String name) - { - // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes) - // until after recover was finished. this turns out to be fragile; it is less error-prone to go - // ahead and allow writes before recover, and just skip active segments when we do. - return CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name); - } - }; + FilenameFilter unmanagedFilesFilter = (dir, name) -> CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name); // submit all files for this segment manager for archiving prior to recovery - CASSANDRA-6904 + // The files may have already been archived by normal CommitLog operation. This may cause errors in this + // archiving pass, which we should not treat as serious. for (File file : new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter)) { archiver.maybeArchive(file.getPath(), file.getName()); @@ -154,6 +143,7 @@ public class CommitLog implements CommitLogMBean assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore"; archiver.maybeRestoreArchive(); + // List the files again as archiver may have added segments. File[] files = new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter); int replayed = 0; if (files.length == 0) @@ -171,7 +161,6 @@ public class CommitLog implements CommitLogMBean segmentManager.handleReplayedSegment(f); } - segmentManager.enableReserveSegmentCreation(); return replayed; } @@ -231,9 +220,9 @@ public class CommitLog implements CommitLogMBean /** * Forces a disk flush on the commit log files that need it. Blocking. */ - public void sync(boolean syncAllSegments) throws IOException + public void sync() throws IOException { - segmentManager.sync(syncAllSegments); + segmentManager.sync(); } /** @@ -315,8 +304,8 @@ public class CommitLog implements CommitLogMBean if (segment.isUnused()) { - logger.trace("Commit log segment {} is unused", segment); - segmentManager.recycleSegment(segment); + logger.debug("Commit log segment {} is unused", segment); + segmentManager.archiveAndDiscard(segment); } else { @@ -455,23 +444,7 @@ public class CommitLog implements CommitLogMBean */ public int restartUnsafe() throws IOException { - segmentManager.start(); - executor.restartUnsafe(); - try - { - return recoverSegmentsOnDisk(); - } - catch (FSWriteError e) - { - // Workaround for a class of races that keeps showing up on Windows tests. - // stop/start/reset path on Windows with segment deletion is very touchy/brittle - // and the timing keeps getting screwed up. Rather than chasing our tail further - // or rewriting the CLSM, just report that we didn't recover anything back up - // the chain. This will silence most intermittent test failures on Windows - // and appropriately fail tests that expected segments to be recovered that - // were not. - return 0; - } + return start().recoverSegmentsOnDisk(); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index 044f2db..1abdd79 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Strings; +import com.google.common.base.Throwables; public class CommitLogArchiver { @@ -151,22 +152,32 @@ public class CommitLogArchiver /** * Differs from the above because it can be used on any file, rather than only - * managed commit log segments (and thus cannot call waitForFinalSync). + * managed commit log segments (and thus cannot call waitForFinalSync), and in + * the treatment of failures. * - * Used to archive files present in the commit log directory at startup (CASSANDRA-6904) + * Used to archive files present in the commit log directory at startup (CASSANDRA-6904). + * Since the files being already archived by normal operation could cause subsequent + * hard-linking or other operations to fail, we should not throw errors on failure */ public void maybeArchive(final String path, final String name) { if (Strings.isNullOrEmpty(archiveCommand)) return; - archivePending.put(name, executor.submit(new WrappedRunnable() + archivePending.put(name, executor.submit(new Runnable() { - protected void runMayThrow() throws IOException + public void run() { - String command = NAME.matcher(archiveCommand).replaceAll(Matcher.quoteReplacement(name)); - command = PATH.matcher(command).replaceAll(Matcher.quoteReplacement(path)); - exec(command); + try + { + String command = NAME.matcher(archiveCommand).replaceAll(Matcher.quoteReplacement(name)); + command = PATH.matcher(command).replaceAll(Matcher.quoteReplacement(path)); + exec(command); + } + catch (IOException e) + { + logger.warn("Archiving file {} failed, file may have already been archived.", name, e); + } } })); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index a1158be..eb9759e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; import org.cliffc.high_scale_lib.NonBlockingHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.codahale.metrics.Timer; import org.apache.cassandra.config.*; @@ -38,6 +36,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.commitlog.CommitLog.Configuration; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -51,8 +50,6 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; */ public abstract class CommitLogSegment { - private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); - private final static long idBase; private CDCState cdcState = CDCState.PERMITTED; @@ -117,14 +114,13 @@ public abstract class CommitLogSegment ByteBuffer buffer; private volatile boolean headerWritten; - final CommitLog commitLog; public final CommitLogDescriptor descriptor; - static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) + static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) { Configuration config = commitLog.configuration; - CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager, onClose) - : config.useCompression() ? new CompressedSegment(commitLog, manager, onClose) + CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager) + : config.useCompression() ? new CompressedSegment(commitLog, manager) : new MemoryMappedSegment(commitLog, manager); segment.writeLogHeader(); return segment; @@ -152,7 +148,6 @@ public abstract class CommitLogSegment */ CommitLogSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) { - this.commitLog = commitLog; this.manager = manager; id = getNextId(); @@ -370,6 +365,18 @@ public abstract class CommitLogSegment } /** + * Discards a segment file when the log no longer requires it. The file may be left on disk if the archive script + * requires it. (Potentially blocking operation) + */ + void discard(boolean deleteFile) + { + close(); + if (deleteFile) + FileUtils.deleteWithConfirm(logFile); + manager.addSize(-onDiskSize()); + } + + /** * @return the current CommitLogPosition for this log segment */ public CommitLogPosition getCurrentCommitLogPosition() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java index 306cec8..a91384f 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java @@ -53,8 +53,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager @Override void start() { - super.start(); cdcSizeTracker.start(); + super.start(); } public void discard(CommitLogSegment segment, boolean delete) @@ -78,9 +78,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager */ public void shutdown() { - run = false; cdcSizeTracker.shutdown(); - wakeManager(); + super.shutdown(); } /** @@ -103,7 +102,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { // Failed to allocate, so move to a new segment with enough room if possible. advanceAllocatingFrom(segment); - segment = allocatingFrom; + segment = allocatingFrom(); throwIfForbidden(mutation, segment); } @@ -146,7 +145,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager */ public CommitLogSegment createSegment() { - CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this, () -> wakeManager()); + CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this); cdcSizeTracker.processNewSegment(segment); return segment; } @@ -179,6 +178,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager */ public void start() { + size = 0; + unflushedCDCSize = 0; cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy()); } @@ -245,7 +246,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { rateLimiter.acquire(); calculateSize(); - CommitLogSegment allocatingFrom = segmentManager.allocatingFrom; + CommitLogSegment allocatingFrom = segmentManager.allocatingFrom(); if (allocatingFrom.getCDCState() == CDCState.FORBIDDEN) processNewSegment(allocatingFrom); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java index 333077c..86e886b 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java @@ -39,15 +39,6 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan } /** - * Initiates the shutdown process for the management thread. - */ - public void shutdown() - { - run = false; - wakeManager(); - } - - /** * Reserve space in the current segment for the provided mutation or, if there isn't space available, * create a new segment. allocate() is blocking until allocation succeeds as it waits on a signal in advanceAllocatingFrom * @@ -64,7 +55,7 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan { // failed to allocate, so move to a new segment with enough room advanceAllocatingFrom(segment); - segment = allocatingFrom; + segment = allocatingFrom(); } return alloc; @@ -84,6 +75,6 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan public CommitLogSegment createSegment() { - return CommitLogSegment.createSegment(commitLog, this, () -> wakeManager()); + return CommitLogSegment.createSegment(commitLog, this); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index 2e46028..288b766 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -42,9 +42,9 @@ public class CompressedSegment extends FileDirectSegment /** * Constructs a new segment file. */ - CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) + CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) { - super(commitLog, manager, onClose); + super(commitLog, manager); this.compressor = commitLog.configuration.getCompressor(); manager.getBufferPool().setPreferredReusableBufferType(compressor.preferredBufferType()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java index 103351e..4ca1ede 100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java @@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment private final EncryptionContext encryptionContext; private final Cipher cipher; - public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) + public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) { - super(commitLog, manager, onClose); + super(commitLog, manager); this.encryptionContext = commitLog.configuration.getEncryptionContext(); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java index d4160e4..55084be 100644 --- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java @@ -29,12 +29,10 @@ import org.apache.cassandra.io.FSWriteError; public abstract class FileDirectSegment extends CommitLogSegment { volatile long lastWrittenPos = 0; - private final Runnable onClose; - FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose) + FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) { super(commitLog, manager); - this.onClose = onClose; } @Override @@ -62,7 +60,7 @@ public abstract class FileDirectSegment extends CommitLogSegment } finally { - onClose.run(); + manager.notifyBufferFreed(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java index 86a248b..bde832b 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java @@ -18,11 +18,10 @@ package org.apache.cassandra.db.commitlog; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.utils.concurrent.WaitQueue; class PeriodicCommitLogService extends AbstractCommitLogService { - private static final int blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5); + private static final long blockWhenSyncLagsNanos = (long) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5e6); public PeriodicCommitLogService(final CommitLog commitLog) { @@ -31,28 +30,12 @@ class PeriodicCommitLogService extends AbstractCommitLogService protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) { - if (waitForSyncToCatchUp(Long.MAX_VALUE)) + long expectedSyncTime = System.nanoTime() - blockWhenSyncLagsNanos; + if (lastSyncedAt < expectedSyncTime) { - // wait until periodic sync() catches up with its schedule - long started = System.currentTimeMillis(); pending.incrementAndGet(); - while (waitForSyncToCatchUp(started)) - { - WaitQueue.Signal signal = syncComplete.register(commitLog.metrics.waitingOnCommit.time()); - if (waitForSyncToCatchUp(started)) - signal.awaitUninterruptibly(); - else - signal.cancel(); - } + awaitSyncAt(expectedSyncTime, commitLog.metrics.waitingOnCommit.time()); pending.decrementAndGet(); } } - - /** - * @return true if sync is currently lagging behind inserts - */ - private boolean waitForSyncToCatchUp(long started) - { - return started > lastSyncedAt + blockWhenSyncLagsMillis; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 06c252f..b86a15b 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -29,9 +29,15 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import com.google.common.util.concurrent.RateLimiter; -import org.junit.*; -import org.apache.cassandra.*; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.UpdateBuilder; import org.apache.cassandra.config.Config.CommitLogSync; import org.apache.cassandra.config.*; import org.apache.cassandra.db.Mutation; @@ -97,6 +103,7 @@ public class CommitLogStressTest initialize(); CommitLogStressTest tester = new CommitLogStressTest(); + tester.cleanDir(); tester.testFixedSize(); } catch (Throwable e) @@ -130,12 +137,13 @@ public class CommitLogStressTest SchemaLoader.loadSchema(); SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour + + CommitLog.instance.stopUnsafe(true); } @Before public void cleanDir() throws IOException { - CommitLog.instance.stopUnsafe(true); File dir = new File(location); if (dir.isDirectory()) { @@ -209,8 +217,6 @@ public class CommitLogStressTest { DatabaseDescriptor.setCommitLogSync(sync); CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start(); - // Need to enable reserve segment creation as close to test start as possible to minimize race - commitLog.segmentManager.enableReserveSegmentCreation(); testLog(commitLog); assert !failed; } @@ -307,17 +313,12 @@ public class CommitLogStressTest private void verifySizes(CommitLog commitLog) { // Complete anything that's still left to write. - commitLog.executor.requestExtraSync().awaitUninterruptibly(); - // One await() does not suffice as we may be signalled when an ongoing sync finished. Request another - // (which shouldn't write anything) to make sure the first we triggered completes. - // FIXME: The executor should give us a chance to await completion of the sync we requested. - commitLog.executor.requestExtraSync().awaitUninterruptibly(); - - // Wait for any pending deletes or segment allocations to complete. + commitLog.executor.syncBlocking(); + // Wait for any concurrent segment allocations to complete. commitLog.segmentManager.awaitManagementTasksCompletion(); long combinedSize = 0; - for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles()) + for (File f : new File(commitLog.segmentManager.storageDirectory).listFiles()) combinedSize += f.length(); Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index df83ff1..e5c1831 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -530,7 +530,7 @@ public class Util public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds) { long now = System.currentTimeMillis(); - while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds)) + while (System.currentTimeMillis() < now + (1000 * timeoutInSeconds)) { if (s.get().equals(expected)) break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java index e308a2f..68ce57d 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java @@ -51,8 +51,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester @Before public void before() throws IOException { - // disable reserve segment to get more deterministic allocation/testing of CDC boundary states - CommitLog.instance.forceRecycleAllSegments(); + CommitLog.instance.resetUnsafe(true); for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) FileUtils.deleteWithConfirm(f); } @@ -120,7 +119,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester for (int i = 0; i < 8; i++) { new RowUpdateBuilder(currentTableMetadata(), 0, i) - .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) + .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) // fit 3 in a segment .build().apply(); } @@ -136,7 +135,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester for (int i = 0; i < 8; i++) { new RowUpdateBuilder(currentTableMetadata(), 0, i) - .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) + .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) .build().apply(); } // 4 total again, 3 CONTAINS, 1 in waiting PERMITTED @@ -215,6 +214,6 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester private void expectCurrentCDCState(CDCState state) { Assert.assertEquals("Received unexpected CDCState on current allocatingFrom segment.", - state, CommitLog.instance.segmentManager.allocatingFrom.getCDCState()); + state, CommitLog.instance.segmentManager.allocatingFrom().getCDCState()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java index b777389..397a8eb 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.db.commitlog; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Random; import java.util.concurrent.Semaphore; import javax.naming.ConfigurationException; @@ -51,6 +52,7 @@ import org.jboss.byteman.contrib.bmunit.BMUnitRunner; public class CommitLogSegmentManagerTest { //Block commit log service from syncing + @SuppressWarnings("unused") private static final Semaphore allowSync = new Semaphore(0); private static final String KEYSPACE1 = "CommitLogTest"; @@ -66,6 +68,7 @@ public class CommitLogSegmentManagerTest DatabaseDescriptor.setCommitLogSegmentSize(1); DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000); + DatabaseDescriptor.setCommitLogMaxCompressionBuffersPerPool(3); SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), @@ -109,11 +112,14 @@ public class CommitLogSegmentManagerTest } Thread.sleep(1000); - // Should only be able to create 3 segments (not 7) because it blocks waiting for truncation that never comes. + // Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes Assert.assertEquals(3, clsm.getActiveSegments().size()); - clsm.getActiveSegments().forEach(segment -> clsm.recycleSegment(segment)); + // Discard the currently active segments so allocation can continue. + // Take snapshot of the list, otherwise this will also discard newly allocated segments. + new ArrayList<>(clsm.getActiveSegments()).forEach( clsm::archiveAndDiscard ); + // The allocated count should reach the limit again. Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 79051e0..4bc5f6b 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -277,7 +277,7 @@ public class CommitLogTest // "Flush": this won't delete anything UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); - CommitLog.instance.sync(true); + CommitLog.instance.sync(); CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getCurrentPosition()); assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size()); @@ -594,7 +594,7 @@ public class CommitLogTest cellCount += 1; CommitLog.instance.add(rm2); - CommitLog.instance.sync(true); + CommitLog.instance.sync(); SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata); List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); @@ -631,7 +631,7 @@ public class CommitLogTest } } - CommitLog.instance.sync(true); + CommitLog.instance.sync(); SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata); List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); @@ -661,6 +661,10 @@ public class CommitLogTest @Override public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) { + // Filter out system writes that could flake the test. + if (!KEYSPACE1.equals(m.getKeyspaceName())) + return; + if (entryLocation <= filterPosition.position) { // Skip over this mutation. http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index 9a22b04..7f43378 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@ -40,7 +40,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException { super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create()); - CommitLog.instance.sync(true); + CommitLog.instance.sync(); this.processor = processor; commitLogReader = new CommitLogTestReader();
