Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c000827a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c000827a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c000827a Branch: refs/heads/trunk Commit: c000827afd48b2dc9901c530d5b4118107114c8d Parents: e646e50 db788fe Author: Jason Brown <jasedbr...@gmail.com> Authored: Wed Dec 13 19:51:53 2017 -0800 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Wed Dec 13 19:52:31 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 8 - .../org/apache/cassandra/config/Config.java | 1 - .../cassandra/config/DatabaseDescriptor.java | 10 - .../db/commitlog/AbstractCommitLogService.java | 215 +++++++++++-------- .../db/commitlog/PeriodicCommitLogService.java | 3 +- .../commitlog/AbstractCommitLogServiceTest.java | 166 ++++++++++++++ .../commitlog/CommitLogChainedMarkersTest.java | 1 - .../CommitLogSegmentBackpressureTest.java | 8 +- 9 files changed, 295 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 60794f0,ee90a67..aa71620 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,15 -1,5 +1,16 @@@ -3.0.16 +3.11.2 + * Prevent continuous schema exchange between 3.0 and 3.11 nodes (CASSANDRA-14109) + * Fix imbalanced disks when replacing node with same address with JBOD (CASSANDRA-14084) + * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948) + * Remove OpenJDK log warning (CASSANDRA-13916) + * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079) + * Cache disk boundaries (CASSANDRA-13215) + * Add asm jar to build.xml for maven builds (CASSANDRA-11193) + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897) + * Update jackson JSON jars (CASSANDRA-13949) + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930) +Merged from 3.0: + * Improve commit log chain marker updating (CASSANDRA-14108) * Extra range tombstone bound creates double rows (CASSANDRA-14008) * Fix SStable ordering by max timestamp in SinglePartitionReadCommand (CASSANDRA-14010) * Accept role names containing forward-slash (CASSANDRA-14088) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/conf/cassandra.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Config.java index 5fe752e,64d41bb..a01203c --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -198,9 -190,8 +198,8 @@@ public class Confi public String commitlog_directory; public Integer commitlog_total_space_in_mb; public CommitLogSync commitlog_sync; - public Double commitlog_sync_batch_window_in_ms; - public Integer commitlog_sync_period_in_ms; + public double commitlog_sync_batch_window_in_ms = Double.NaN; + public int commitlog_sync_period_in_ms; - public int commitlog_marker_period_in_ms = 0; public int commitlog_segment_size_in_mb = 32; public ParameterizedClass commitlog_compression; public int commitlog_max_compression_buffers_in_pool = 3; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 0410650,829530d..4571b54 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@@ -17,21 -17,17 +17,21 @@@ */ package org.apache.cassandra.db.commitlog; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Timer.Context; + import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Config; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; + import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.WaitQueue; -import org.slf4j.*; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractCommitLogService { @@@ -54,13 -57,13 +60,13 @@@ /** * The duration between syncs to disk. */ - private final long syncIntervalNanos; - final long syncIntervalMillis; ++ final long syncIntervalNanos; /** * The duration between updating the chained markers in the the commit log file. This value should be - * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}. + * 0 < {@link #markerIntervalNanos} <= {@link #syncIntervalNanos}. */ - private final long markerIntervalNanos; - final long markerIntervalMillis; ++ final long markerIntervalNanos; /** * A flag that callers outside of the sync thread can use to signal they want the commitlog segments @@@ -92,121 -97,156 +100,142 @@@ { this.commitLog = commitLog; this.name = name; - this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS); - // if we are not using periodic mode, or we using compression/encryption, we shouldn't update the chained markers - // faster than the sync interval - if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression() || commitLog.configuration.useEncryption()) - markerIntervalMillis = syncIntervalMillis; ++ final long markerIntervalMillis; + if (markHeadersFaster && syncIntervalMillis > DEFAULT_MARKER_INTERVAL_MILLIS) + { + markerIntervalMillis = DEFAULT_MARKER_INTERVAL_MILLIS; + long modulo = syncIntervalMillis % markerIntervalMillis; + if (modulo != 0) + { + // quantize syncIntervalMillis to a multiple of markerIntervalMillis + syncIntervalMillis -= modulo; - // apply basic bounds checking on the marker interval - if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis) + if (modulo >= markerIntervalMillis / 2) + syncIntervalMillis += markerIntervalMillis; + } + logger.debug("Will update the commitlog markers every {}ms and flush every {}ms", markerIntervalMillis, syncIntervalMillis); + } + else { - logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval", - markerIntervalMillis, syncIntervalMillis); markerIntervalMillis = syncIntervalMillis; } -- + assert syncIntervalMillis % markerIntervalMillis == 0; - this.syncIntervalMillis = syncIntervalMillis; + this.markerIntervalNanos = TimeUnit.NANOSECONDS.convert(markerIntervalMillis, TimeUnit.MILLISECONDS); ++ this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS); } // Separated into individual method to ensure relevant objects are constructed before this is started. void start() { - if (syncIntervalMillis < 1) - throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", - syncIntervalMillis)); + if (syncIntervalNanos < 1) + throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", + syncIntervalNanos * 1e-6)); + shutdown = false; + Runnable runnable = new SyncRunnable(new Clock()); - thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name); ++ thread = NamedThreadFactory.createThread(runnable, name); + thread.start(); + } - Runnable runnable = new Runnable() + class SyncRunnable implements Runnable + { - final Clock clock; - long firstLagAt = 0; - long totalSyncDuration = 0; // total time spent syncing since firstLagAt - long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt - int lagCount = 0; - int syncCount = 0; ++ private final Clock clock; ++ private long firstLagAt = 0; ++ private long totalSyncDuration = 0; // total time spent syncing since firstLagAt ++ private long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt ++ private int lagCount = 0; ++ private int syncCount = 0; + + SyncRunnable(Clock clock) { - public void run() + this.clock = clock; + } + + public void run() + { + while (true) { - long firstLagAt = 0; - long totalSyncDuration = 0; // total time spent syncing since firstLagAt - long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt - int lagCount = 0; - int syncCount = 0; + if (!sync()) + break; + } + } + + boolean sync() + { ++ // always run once after shutdown signalled ++ boolean shutdownRequested = shutdown; + - while (true) + try + { - // always run once after shutdown signalled - boolean run = !shutdown; - + // sync and signal - long pollStarted = clock.currentTimeMillis(); - if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested) ++ long pollStarted = clock.nanoTime(); ++ if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested) + { + // in this branch, we want to flush the commit log to disk - commitLog.sync(shutdown, true); ++ commitLog.sync(true); + syncRequested = false; + lastSyncedAt = pollStarted; + syncComplete.signalAll(); ++ syncCount++; + } + else { - // always run once after shutdown signalled - boolean shutdownRequested = shutdown; + // in this branch, just update the commit log sync headers - commitLog.sync(false, false); ++ commitLog.sync(false); + } - try + // sleep any time we have left before the next one is due - long now = clock.currentTimeMillis(); - long sleep = pollStarted + markerIntervalMillis - now; - if (sleep < 0) ++ long now = clock.nanoTime(); ++ long wakeUpAt = pollStarted + markerIntervalNanos; ++ if (wakeUpAt < now) + { + // if we have lagged noticeably, update our lag counter + if (firstLagAt == 0) { - // sync and signal - long pollStarted = System.nanoTime(); - if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested) - { - // in this branch, we want to flush the commit log to disk - commitLog.sync(true); - syncRequested = false; - lastSyncedAt = pollStarted; - syncComplete.signalAll(); - } - else - { - // in this branch, just update the commit log sync headers - commitLog.sync(false); - } - - // sleep any time we have left before the next one is due - long now = System.nanoTime(); - long wakeUpAt = pollStarted + markerIntervalNanos; - if (wakeUpAt < now) - { - // if we have lagged noticeably, update our lag counter - if (firstLagAt == 0) - { - firstLagAt = now; - totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; - } - syncExceededIntervalBy += now - wakeUpAt; - lagCount++; - } - syncCount++; - totalSyncDuration += now - pollStarted; - - if (firstLagAt > 0) - { - //Only reset the lag tracking if it actually logged this time - boolean logged = NoSpamLogger.log(logger, - NoSpamLogger.Level.WARN, - 5, - TimeUnit.MINUTES, - "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", - syncCount, - String.format("%.2f", (now - firstLagAt) * 1e-9d), - String.format("%.2f", totalSyncDuration * 1e-6d / syncCount), - lagCount, - String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount)); - if (logged) - firstLagAt = 0; - } - - if (shutdownRequested) - return; - - if (wakeUpAt > now) - LockSupport.parkNanos(wakeUpAt - now); + firstLagAt = now; + totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; } - catch (Throwable t) - { - if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) - break; - syncExceededIntervalBy -= sleep; ++ syncExceededIntervalBy += now - wakeUpAt; + lagCount++; + } - syncCount++; + totalSyncDuration += now - pollStarted; - // sleep for full poll-interval after an error, so we don't spam the log file - LockSupport.parkNanos(markerIntervalNanos); - } + if (firstLagAt > 0) + { + //Only reset the lag tracking if it actually logged this time - boolean logged = NoSpamLogger.log( - logger, - NoSpamLogger.Level.WARN, - 5, - TimeUnit.MINUTES, - "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", - syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount)); ++ boolean logged = NoSpamLogger.log(logger, ++ NoSpamLogger.Level.WARN, ++ 5, ++ TimeUnit.MINUTES, ++ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", ++ syncCount, ++ String.format("%.2f", (now - firstLagAt) * 1e-9d), ++ String.format("%.2f", totalSyncDuration * 1e-6d / syncCount), ++ lagCount, ++ String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount)); + if (logged) + firstLagAt = 0; } + - if (!run) ++ if (shutdownRequested) + return false; + - // if we have lagged this round, we probably have work to do already so we don't sleep - if (sleep < 0) - return true; - - try - { - haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS); - haveWork.drainPermits(); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } ++ if (wakeUpAt > now) ++ LockSupport.parkNanos(wakeUpAt - now); } - }; + catch (Throwable t) + { + if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) + return false; - shutdown = false; - thread = NamedThreadFactory.createThread(runnable, name); - thread.start(); + // sleep for full poll-interval after an error, so we don't spam the log file - try - { - haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } ++ LockSupport.parkNanos(markerIntervalNanos); + } ++ + return true; + } } - /** * Block for @param alloc to be sync'd as necessary, and handle bookkeeping */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java index bf6eb49,7a09de0..efd3394 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java @@@ -25,7 -26,8 +25,8 @@@ class PeriodicCommitLogService extends public PeriodicCommitLogService(final CommitLog commitLog) { - super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod()); + super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), - !commitLog.configuration.useCompression()); ++ !(commitLog.configuration.useCompression() || commitLog.configuration.useEncryption())); } protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java index 0000000,5a46e5f..18f15fa mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java @@@ -1,0 -1,176 +1,166 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.commitlog; + + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + + import org.junit.Assert; + import org.junit.BeforeClass; -import org.junit.Ignore; + import org.junit.Test; + + import org.apache.cassandra.config.Config; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.utils.Clock; + import org.apache.cassandra.utils.FreeRunningClock; + + import static org.apache.cassandra.db.commitlog.AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS; + + public class AbstractCommitLogServiceTest + { + @BeforeClass + public static void before() + { ++ DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000); + } + + @Test + public void testConstructorSyncIsQuantized() + { + long syncTimeMillis = 10 * 1000; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); - Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis); - Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis); ++ Assert.assertEquals(toNanos(DEFAULT_MARKER_INTERVAL_MILLIS), commitLogService.markerIntervalNanos); ++ Assert.assertEquals(toNanos(syncTimeMillis), commitLogService.syncIntervalNanos); + } + + @Test + public void testConstructorSyncEqualsMarkerDefault() + { + long syncTimeMillis = 100; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); - Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis); - Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis); - Assert.assertEquals(commitLogService.markerIntervalMillis, commitLogService.syncIntervalMillis); ++ Assert.assertEquals(toNanos(DEFAULT_MARKER_INTERVAL_MILLIS), commitLogService.markerIntervalNanos); ++ Assert.assertEquals(toNanos(syncTimeMillis), commitLogService.syncIntervalNanos); ++ Assert.assertEquals(commitLogService.markerIntervalNanos, commitLogService.syncIntervalNanos); + } + + @Test + public void testConstructorSyncShouldRoundUp() + { + long syncTimeMillis = 151; + long expectedMillis = 200; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); - Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis); - Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis); ++ Assert.assertEquals(toNanos(DEFAULT_MARKER_INTERVAL_MILLIS), commitLogService.markerIntervalNanos); ++ Assert.assertEquals(toNanos(expectedMillis), commitLogService.syncIntervalNanos); + } + + @Test + public void testConstructorSyncShouldRoundDown() + { + long syncTimeMillis = 121; + long expectedMillis = 100; + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); - Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis); - Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis); ++ Assert.assertEquals(toNanos(DEFAULT_MARKER_INTERVAL_MILLIS), commitLogService.markerIntervalNanos); ++ Assert.assertEquals(toNanos(expectedMillis), commitLogService.syncIntervalNanos); + } + + @Test + public void testConstructorSyncTinyValue() + { + long syncTimeMillis = 10; - long expectedNanos = syncTimeMillis; ++ long expectedNanos = toNanos(syncTimeMillis); + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); - Assert.assertEquals(expectedNanos, commitLogService.markerIntervalMillis); - Assert.assertEquals(expectedNanos, commitLogService.syncIntervalMillis); ++ Assert.assertEquals(expectedNanos, commitLogService.markerIntervalNanos); ++ Assert.assertEquals(expectedNanos, commitLogService.syncIntervalNanos); ++ } ++ ++ private static long toNanos(long millis) ++ { ++ return TimeUnit.MILLISECONDS.toNanos(millis); + } + + private static class FakeCommitLogService extends AbstractCommitLogService + { + FakeCommitLogService(long syncIntervalMillis) + { + super(new FakeCommitLog(), "This is not a real commit log", syncIntervalMillis, true); + lastSyncedAt = 0; + } + - @Override - void start() - { - // nop - } - + protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) + { + // nop + } + } + + @Test + public void testSync() + { + long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2; + FreeRunningClock clock = new FreeRunningClock(); + FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis); + AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock); + FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog; + + // at time 0 + Assert.assertTrue(syncRunnable.sync()); + Assert.assertEquals(1, commitLog.markCount.get()); + Assert.assertEquals(0, commitLog.syncCount.get()); + + // at time DEFAULT_MARKER_INTERVAL_MILLIS + clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + Assert.assertTrue(syncRunnable.sync()); + Assert.assertEquals(2, commitLog.markCount.get()); + Assert.assertEquals(0, commitLog.syncCount.get()); + + // at time DEFAULT_MARKER_INTERVAL_MILLIS * 2 + clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + Assert.assertTrue(syncRunnable.sync()); + Assert.assertEquals(2, commitLog.markCount.get()); + Assert.assertEquals(1, commitLog.syncCount.get()); + + // at time DEFAULT_MARKER_INTERVAL_MILLIS * 3, but with shutdown! + clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + commitLogService.shutdown(); + Assert.assertFalse(syncRunnable.sync()); + Assert.assertEquals(2, commitLog.markCount.get()); + Assert.assertEquals(2, commitLog.syncCount.get()); + } + + private static class FakeCommitLog extends CommitLog + { + private final AtomicInteger markCount = new AtomicInteger(); + private final AtomicInteger syncCount = new AtomicInteger(); + + FakeCommitLog() + { - super(DatabaseDescriptor.getCommitLogLocation(), null); - } - - @Override - CommitLog start() - { - // this is a bit dicey. we need to start the allocator, but starting the parent's executor will muck things - // up as it is pointing to a different executor service, not the fake one in this test class. - allocator.start(); - return this; ++ super(null); + } + + @Override - public void sync(boolean syncAllSegments, boolean flush) ++ public void sync(boolean flush) + { + if (flush) + syncCount.incrementAndGet(); + else + markCount.incrementAndGet(); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000827a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java index 46a3fb0,c615880..0076eb6 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java @@@ -65,14 -64,14 +65,14 @@@ public class CommitLogSegmentBackpressu @Test @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync", - targetClass = "AbstractCommitLogService$1", - targetMethod = "run", + targetClass = "AbstractCommitLogService$SyncRunnable", + targetMethod = "sync", - targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)", + targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)", action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"), @BMRule(name = "Release Semaphore after sync", - targetClass = "AbstractCommitLogService$1", - targetMethod = "run", + targetClass = "AbstractCommitLogService$SyncRunnable", + targetMethod = "sync", - targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)", + targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)", action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")}) public void testCompressedCommitLogBackpressure() throws Throwable { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org