Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 67db1bf27 -> e916dff8b
Force batchlog replay before decommissioning a node patch by Branimir Lambov; reviewed by Aleksey Yeschenko for CASSANDRA-7446 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e916dff8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e916dff8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e916dff8 Branch: refs/heads/cassandra-2.0 Commit: e916dff8ba032d878ad4435eb7175c6a56f79ef4 Parents: 67db1bf Author: Branimir Lambov <[email protected]> Authored: Fri Oct 17 03:18:37 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Fri Oct 17 03:18:37 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/BatchlogManager.java | 63 ++++++++++---------- .../cassandra/service/StorageService.java | 25 ++++++-- .../cassandra/db/BatchlogManagerTest.java | 8 +-- 4 files changed, 57 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e916dff8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cd4b6bb..73aaab0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.11: + * Force batchlog replay before decommissioning a node (CASSANDRA-7446) * Fix hint replay with many accumulated expired hints (CASSANDRA-6998) * Fix duplicate results in DISTINCT queries on static columns with query paging (CASSANDRA-8108) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e916dff8/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index b92c217..48f4c3c 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -25,7 +25,6 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -69,8 +68,8 @@ public class BatchlogManager implements BatchlogManagerMBean public static final BatchlogManager instance = new BatchlogManager(); private final AtomicLong totalBatchesReplayed = new AtomicLong(); - private final AtomicBoolean isReplaying = new AtomicBoolean(); + // Single-thread executor service for scheduling and serializing log replay. public static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); public void start() @@ -108,6 +107,11 @@ public class BatchlogManager implements BatchlogManagerMBean public void forceBatchlogReplay() { + startBatchlogReplay(); + } + + public Future<?> startBatchlogReplay() + { Runnable runnable = new WrappedRunnable() { public void runMayThrow() throws ExecutionException, InterruptedException @@ -115,7 +119,8 @@ public class BatchlogManager implements BatchlogManagerMBean replayAllFailedBatches(); } }; - batchlogTasks.execute(runnable); + // If a replay is already in progress this request will be executed after it completes. + return batchlogTasks.submit(runnable); } public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid) @@ -156,12 +161,8 @@ public class BatchlogManager implements BatchlogManagerMBean return ByteBuffer.wrap(bos.toByteArray()); } - @VisibleForTesting - void replayAllFailedBatches() throws ExecutionException, InterruptedException + private void replayAllFailedBatches() throws ExecutionException, InterruptedException { - if (!isReplaying.compareAndSet(false, true)) - return; - logger.debug("Started replayAllFailedBatches"); // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). @@ -169,34 +170,27 @@ public class BatchlogManager implements BatchlogManagerMBean int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - try - { - UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", - Keyspace.SYSTEM_KS, - SystemKeyspace.BATCHLOG_CF, - PAGE_SIZE); - - while (!page.isEmpty()) - { - UUID id = processBatchlogPage(page, rateLimiter); + UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", + Keyspace.SYSTEM_KS, + SystemKeyspace.BATCHLOG_CF, + PAGE_SIZE); - if (page.size() < PAGE_SIZE) - break; // we've exhausted the batchlog, next query would be empty. + while (!page.isEmpty()) + { + UUID id = processBatchlogPage(page, rateLimiter); - page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(%s) LIMIT %d", - Keyspace.SYSTEM_KS, - SystemKeyspace.BATCHLOG_CF, - id, - PAGE_SIZE); - } + if (page.size() < PAGE_SIZE) + break; // we've exhausted the batchlog, next query would be empty. - cleanup(); - } - finally - { - isReplaying.set(false); + page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(%s) LIMIT %d", + Keyspace.SYSTEM_KS, + SystemKeyspace.BATCHLOG_CF, + id, + PAGE_SIZE); } + cleanup(); + logger.debug("Finished replayAllFailedBatches"); } @@ -210,7 +204,7 @@ public class BatchlogManager implements BatchlogManagerMBean long writtenAt = row.getLong("written_at"); int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; // enough time for the actual write + batchlog entry mutation delivery (two separate requests). - long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation + long timeout = getBatchlogTimeout(); if (System.currentTimeMillis() < writtenAt + timeout) continue; // not ready to replay yet, might still get a deletion. replayBatch(id, row.getBytes("data"), writtenAt, version, rateLimiter); @@ -218,6 +212,11 @@ public class BatchlogManager implements BatchlogManagerMBean return id; } + public long getBatchlogTimeout() + { + return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation + } + private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) { logger.debug("Replaying batch {}", id); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e916dff8/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index cca6f79..56056ab 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2914,8 +2914,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (logger.isDebugEnabled()) logger.debug("DECOMMISSIONING"); startLeaving(); - setMode(Mode.LEAVING, "sleeping " + RING_DELAY + " ms for pending range setup", true); - Thread.sleep(RING_DELAY); + long timeout = Math.max(RING_DELAY, BatchlogManager.instance.getBatchlogTimeout()); + setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch processing and pending range setup", true); + Thread.sleep(timeout); Runnable finishLeaving = new Runnable() { @@ -2958,13 +2959,29 @@ public class StorageService extends NotificationBroadcasterSupport implements IE rangesToStream.put(keyspaceName, rangesMM); } - setMode(Mode.LEAVING, "streaming data to other nodes", true); + setMode(Mode.LEAVING, "replaying batch log and streaming data to other nodes", true); + // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint. + Future<?> batchlogReplay = BatchlogManager.instance.startBatchlogReplay(); Future<StreamState> streamSuccess = streamRanges(rangesToStream); + + // Wait for batch log to complete before streaming hints. + logger.debug("waiting for batch log processing."); + try + { + batchlogReplay.get(); + } + catch (ExecutionException | InterruptedException e) + { + throw new RuntimeException(e); + } + + setMode(Mode.LEAVING, "streaming hints to other nodes", true); + Future<StreamState> hintsSuccess = streamHints(); // wait for the transfer runnables to signal the latch. - logger.debug("waiting for stream aks."); + logger.debug("waiting for stream acks."); try { streamSuccess.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e916dff8/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java index 0b6a908..846b008 100644 --- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java +++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java @@ -77,8 +77,8 @@ public class BatchlogManagerTest extends SchemaLoader assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches); assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); - // Force batchlog replay. - BatchlogManager.instance.replayAllFailedBatches(); + // Force batchlog replay and wait for it to complete. + BatchlogManager.instance.startBatchlogReplay().get(); // Ensure that the first half, and only the first half, got replayed. assertEquals(500, BatchlogManager.instance.countAllBatches() - initialAllBatches); @@ -138,8 +138,8 @@ public class BatchlogManagerTest extends SchemaLoader // Flush the batchlog to disk (see CASSANDRA-6822). Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF).forceFlush(); - // Force batchlog replay. - BatchlogManager.instance.replayAllFailedBatches(); + // Force batchlog replay and wait for it to complete. + BatchlogManager.instance.startBatchlogReplay().get(); // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied. for (int i = 0; i < 1000; i++)
