Updated Branches: refs/heads/trunk 02769d5b6 -> b2dcd9406
flesh out BatchlogManagerMBean patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4635 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2dcd940 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2dcd940 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2dcd940 Branch: refs/heads/trunk Commit: b2dcd9406b779a2ef28041263a3580156b015dfd Parents: 02769d5 Author: Jonathan Ellis <[email protected]> Authored: Mon Sep 10 13:15:21 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Mon Sep 10 13:15:21 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../org/apache/cassandra/config/CFMetaData.java | 1 - .../org/apache/cassandra/db/BatchlogManager.java | 107 +++++++++------ .../apache/cassandra/db/BatchlogManagerMBean.java | 16 ++ 4 files changed, 82 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 896b8cc..95a8b18 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,5 @@ 1.2-beta1 - * add atomic_batch_mutate (CASSANDRA-4542) + * add atomic_batch_mutate (CASSANDRA-4542, -4635) * increase default max_hint_window_in_ms to 3h (CASSANDRA-4632) * include message initiation time to replicas so they can more accurately drop timed-out requests (CASSANDRA-2858) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 9ee684c..4e29fc7 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -194,7 +194,6 @@ public final class CFMetaData public static final CFMetaData BatchlogCF = compile(16, "CREATE TABLE " + SystemTable.BATCHLOG_CF + " (" + "id uuid PRIMARY KEY," - + "coordinator inet," + "written_at timestamp," + "data blob" + ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 2ae9361..ded1ca4 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -23,6 +23,8 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -32,17 +34,16 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; -import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy; @@ -54,17 +55,18 @@ public class BatchlogManager implements BatchlogManagerMBean { private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager"; private static final int VERSION = MessagingService.VERSION_12; - private static final long TIMEOUT = 2 * DatabaseDescriptor.getRpcTimeout(); + private static final long TIMEOUT = 2 * DatabaseDescriptor.getWriteRpcTimeout(); - private static final ByteBuffer COORDINATOR = columnName("coordinator"); private static final ByteBuffer WRITTEN_AT = columnName("written_at"); private static final ByteBuffer DATA = columnName("data"); - private static final SortedSet<ByteBuffer> META = ImmutableSortedSet.of(COORDINATOR, WRITTEN_AT); private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class); public static final BatchlogManager instance = new BatchlogManager(); + private final AtomicLong totalBatchesReplayed = new AtomicLong(); + private final AtomicBoolean isReplaying = new AtomicBoolean(); + public void start() { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -90,15 +92,43 @@ public class BatchlogManager implements BatchlogManagerMBean TimeUnit.MILLISECONDS); } + public int countAllBatches() + { + int count = 0; + + for (Row row : getRangeSlice(new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of()))) + { + if (row.cf != null && !row.cf.isMarkedForDelete()) + count++; + } + + return count; + } + + public long getTotalBatchesReplayed() + { + return totalBatchesReplayed.longValue(); + } + + public void forceBatchlogReplay() + { + Runnable runnable = new Runnable() + { + public void run() + { + replayAllFailedBatches(); + } + }; + StorageService.optionalTasks.execute(runnable); + } + public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid) { long timestamp = FBUtilities.timestampMicros(); - ByteBuffer coordinator = InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress()); ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000); ByteBuffer data = serializeRowMutations(mutations); ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCF); - cf.addColumn(new Column(COORDINATOR, coordinator, timestamp)); cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp)); cf.addColumn(new Column(DATA, data, timestamp)); RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid)); @@ -126,55 +156,38 @@ public class BatchlogManager implements BatchlogManagerMBean return ByteBuffer.wrap(bos.toByteArray()); } - private static void replayAllFailedBatches() + private void replayAllFailedBatches() { - if (logger.isDebugEnabled()) - logger.debug("Started replayAllFailedBatches"); - - ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF); - - if (store.isEmpty()) + if (!isReplaying.compareAndSet(false, true)) return; - IPartitioner partitioner = StorageService.getPartitioner(); - RowPosition minPosition = partitioner.getMinimumToken().minKeyBound(); - AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner); - - List<Row> rows = store.getRangeSlice(null, range, Integer.MAX_VALUE, new NamesQueryFilter(META), null); - - for (Row row : rows) + try { - if (row.cf.isMarkedForDelete()) - continue; - - IColumn coordinatorColumn = row.cf.getColumn(COORDINATOR); - IColumn writtenAtColumn = row.cf.getColumn(WRITTEN_AT); + logger.debug("Started replayAllFailedBatches"); - if (coordinatorColumn == null || writtenAtColumn == null) + for (Row row : getRangeSlice(new NamesQueryFilter(WRITTEN_AT))) { - replayBatch(row.key); - continue; - } + if (row.cf == null || row.cf.isMarkedForDelete()) + continue; - InetAddress coordinator = InetAddressType.instance.compose(coordinatorColumn.value()); - long writtenAt = LongType.instance.compose(writtenAtColumn.value()); - // if the batch is new and its coordinator is alive - give it a chance to complete naturally. - if (System.currentTimeMillis() < writtenAt + TIMEOUT && FailureDetector.instance.isAlive(coordinator)) - continue; - - replayBatch(row.key); + IColumn writtenAt = row.cf.getColumn(WRITTEN_AT); + if (writtenAt == null || System.currentTimeMillis() > LongType.instance.compose(writtenAt.value()) + TIMEOUT) + replayBatch(row.key); + } + } + finally + { + isReplaying.set(false); } - if (logger.isDebugEnabled()) - logger.debug("Finished replayAllFailedBatches"); + logger.debug("Finished replayAllFailedBatches"); } - private static void replayBatch(DecoratedKey key) + private void replayBatch(DecoratedKey key) { UUID uuid = UUIDType.instance.compose(key.key); - if (logger.isDebugEnabled()) - logger.debug("Replaying batch {}", uuid); + logger.debug("Replaying batch {}", uuid); ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF); QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(SystemTable.BATCHLOG_CF), DATA); @@ -195,6 +208,7 @@ public class BatchlogManager implements BatchlogManagerMBean } deleteBatch(key); + totalBatchesReplayed.incrementAndGet(); } private static void writeHintsForSerializedMutations(ByteBuffer data) throws IOException @@ -228,4 +242,13 @@ public class BatchlogManager implements BatchlogManagerMBean ByteBuffer raw = UTF8Type.instance.decompose(name); return CFMetaData.BatchlogCF.getCfDef().getColumnNameBuilder().add(raw).build(); } + + private static List<Row> getRangeSlice(IFilter columnFilter) + { + ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF); + IPartitioner partitioner = StorageService.getPartitioner(); + RowPosition minPosition = partitioner.getMinimumToken().minKeyBound(); + AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner); + return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java index 0322b21..2e60ba4 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java +++ b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java @@ -19,4 +19,20 @@ package org.apache.cassandra.db; public interface BatchlogManagerMBean { + /** + * Counts all batches currently in the batchlog. + * + * @return total batch count + */ + public int countAllBatches(); + + /** + * @return total count of batches replayed since node start + */ + public long getTotalBatchesReplayed(); + + /** + * Forces batchlog replay. Returns immediately if replay is already in progress. + */ + public void forceBatchlogReplay(); }
