Repository: cassandra Updated Branches: refs/heads/trunk bf8ac1acd -> c35bfc09c
Optimize batchlog replay to avoid full scans patch by Branimir Lambov; reviewed by Aleksey Yeschenko for CASSANDRA-7237 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/762db474 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/762db474 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/762db474 Branch: refs/heads/trunk Commit: 762db474273f764b189d3613fce33943cd64701b Parents: ef59624 Author: Branimir Lambov <[email protected]> Authored: Sat Aug 1 11:55:47 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Thu Aug 6 17:12:28 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 2 + NEWS.txt | 2 + .../apache/cassandra/db/BatchlogManager.java | 226 +++++++++-------- .../apache/cassandra/db/ColumnFamilyStore.java | 13 + src/java/org/apache/cassandra/db/Memtable.java | 2 +- .../org/apache/cassandra/db/SystemKeyspace.java | 28 ++- .../apache/cassandra/dht/LocalPartitioner.java | 30 ++- .../apache/cassandra/service/StorageProxy.java | 2 +- .../cassandra/db/BatchlogManagerTest.java | 246 +++++++++++++++---- 9 files changed, 395 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 80e0e50..95fade9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-beta1 + * Optimize batchlog replay to avoid full scans (CASSANDRA-7237) * Repair improvements when using vnodes (CASSANDRA-5220) * Disable scripted UDFs by default (CASSANDRA-9889) * Add transparent data encryption core classes (CASSANDRA-9945) @@ -11,6 +12,7 @@ Merged from 2.1: Merged from 2.0: * Don't cast expected bf size to an int (CASSANDRA-9959) + 3.0.0-alpha1 * Implement proper sandboxing for UDFs (CASSANDRA-9402) * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066) http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 1fcbb12..ef61f6c 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -58,6 +58,8 @@ Upgrading be done by setting the new option `enabled` to `false`. - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax has been deprecated since 2.1.0 and is being removed in 3.0.0. + - Batchlog entries are now stored in a new table - system.batches. + The old one has been deprecated. 2.2 http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 9e90d9d..8ea4318 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -23,30 +23,24 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; import javax.management.ObjectName; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; import com.google.common.util.concurrent.RateLimiter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.WriteFailureException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -57,20 +51,22 @@ import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.UUIDGen; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging; public class BatchlogManager implements BatchlogManagerMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager"; - private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds - private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size. + private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds + private static final int DEFAULT_PAGE_SIZE = 128; private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class); public static final BatchlogManager instance = new BatchlogManager(); - private final AtomicLong totalBatchesReplayed = new AtomicLong(); + private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread. + private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0); // Single-thread executor service for scheduling and serializing log replay. private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); @@ -87,15 +83,20 @@ public class BatchlogManager implements BatchlogManagerMBean throw new RuntimeException(e); } - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws ExecutionException, InterruptedException - { - replayAllFailedBatches(); - } - }; + batchlogTasks.schedule(this::replayInitially, StorageService.RING_DELAY, TimeUnit.MILLISECONDS); + + batchlogTasks.scheduleWithFixedDelay(this::replayAllFailedBatches, + StorageService.RING_DELAY + REPLAY_INTERVAL, + REPLAY_INTERVAL, + TimeUnit.MILLISECONDS); + } + + private void replayInitially() + { + // Initial run must take care of non-time-uuid batches as written by Version 1.2. + convertOldBatchEntries(); - batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS); + replayAllFailedBatches(); } public static void shutdown() throws InterruptedException @@ -106,13 +107,16 @@ public class BatchlogManager implements BatchlogManagerMBean public int countAllBatches() { - String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG); - return (int) executeInternal(query).one().getLong("count"); + String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES); + UntypedResultSet results = executeInternal(query); + if (results.isEmpty()) + return 0; + return (int) results.one().getLong("count"); } public long getTotalBatchesReplayed() { - return totalBatchesReplayed.longValue(); + return totalBatchesReplayed; } public void forceBatchlogReplay() throws Exception @@ -122,34 +126,27 @@ public class BatchlogManager implements BatchlogManagerMBean public Future<?> startBatchlogReplay() { - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws ExecutionException, InterruptedException - { - replayAllFailedBatches(); - } - }; // If a replay is already in progress this request will be executed after it completes. - return batchlogTasks.submit(runnable); + return batchlogTasks.submit(this::replayAllFailedBatches); } - public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version) + void performInitialReplay() throws InterruptedException, ExecutionException { - return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros()); + // Invokes initial replay. Used for testing only. + batchlogTasks.submit(this::replayInitially).get(); } - @VisibleForTesting - static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now) + public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version) { - return new RowUpdateBuilder(SystemKeyspace.Batchlog, now, uuid) + return new RowUpdateBuilder(SystemKeyspace.Batches, FBUtilities.timestampMicros(), uuid) .clustering() .add("data", serializeMutations(mutations, version)) - .add("written_at", new Date(now / 1000)) .add("version", version) .build(); } - private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version) + @VisibleForTesting + static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version) { try (DataOutputBuffer buf = new DataOutputBuffer()) { @@ -164,7 +161,7 @@ public class BatchlogManager implements BatchlogManagerMBean } } - private void replayAllFailedBatches() throws ExecutionException, InterruptedException + private void replayAllFailedBatches() { logger.debug("Started replayAllFailedBatches"); @@ -173,67 +170,62 @@ public class BatchlogManager implements BatchlogManagerMBean int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", - SystemKeyspace.NAME, - SystemKeyspace.BATCHLOG, - PAGE_SIZE)); - - while (!page.isEmpty()) - { - UUID id = processBatchlogPage(page, rateLimiter); - - if (page.size() < PAGE_SIZE) - break; // we've exhausted the batchlog, next query would be empty. - - page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", - SystemKeyspace.NAME, - SystemKeyspace.BATCHLOG, - PAGE_SIZE), - id); - } + UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout()); + int pageSize = calculatePageSize(); + // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is + // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify + // token(id) > token(lastReplayedUuid) as part of the query. + String query = String.format("SELECT id, data, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)", + SystemKeyspace.NAME, + SystemKeyspace.BATCHES); + UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid); + processBatchlogEntries(batches, pageSize, rateLimiter); + lastReplayedUuid = limitUuid; + logger.debug("Finished replayAllFailedBatches"); + } - cleanup(); + // read less rows (batches) per page if they are very large + private static int calculatePageSize() + { + ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES); + double averageRowSize = store.getMeanPartitionSize(); + if (averageRowSize <= 0) + return DEFAULT_PAGE_SIZE; - logger.debug("Finished replayAllFailedBatches"); + return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize)); } - private void deleteBatch(UUID id) + private static void deleteBatch(UUID id) { Mutation mutation = new Mutation( - PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, + PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches, UUIDType.instance.decompose(id), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); mutation.apply(); } - private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter) + private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter) { - UUID id = null; - ArrayList<Batch> batches = new ArrayList<>(page.size()); + int positionInPage = 0; + ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize); // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others - for (UntypedResultSet.Row row : page) + for (UntypedResultSet.Row row : batches) { - id = row.getUUID("id"); - long writtenAt = row.getLong("written_at"); - // enough time for the actual write + batchlog entry mutation delivery (two separate requests). - long timeout = getBatchlogTimeout(); - if (System.currentTimeMillis() < writtenAt + timeout) - continue; // not ready to replay yet, might still get a deletion. - - int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; - Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version); + UUID id = row.getUUID("id"); + int version = row.getInt("version"); + Batch batch = new Batch(id, row.getBytes("data"), version); try { if (batch.replay(rateLimiter) > 0) { - batches.add(batch); + unfinishedBatches.add(batch); } else { deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated). - totalBatchesReplayed.incrementAndGet(); + ++totalBatchesReplayed; } } catch (IOException e) @@ -241,22 +233,31 @@ public class BatchlogManager implements BatchlogManagerMBean logger.warn("Skipped batch replay of {} due to {}", id, e); deleteBatch(id); } + + if (++positionInPage == pageSize) + { + // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory, + // finish processing the page before requesting the next row. + finishAndClearBatches(unfinishedBatches); + positionInPage = 0; + } } + finishAndClearBatches(unfinishedBatches); + } - // now waiting for all batches to complete their processing + private void finishAndClearBatches(ArrayList<Batch> batches) + { // schedule hints for timed out deliveries for (Batch batch : batches) { batch.finish(); deleteBatch(batch.id); } - - totalBatchesReplayed.addAndGet(batches.size()); - - return id; + totalBatchesReplayed += batches.size(); + batches.clear(); } - public long getBatchlogTimeout() + public static long getBatchlogTimeout() { return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation } @@ -270,10 +271,10 @@ public class BatchlogManager implements BatchlogManagerMBean private List<ReplayWriteResponseHandler<Mutation>> replayHandlers; - public Batch(UUID id, long writtenAt, ByteBuffer data, int version) + Batch(UUID id, ByteBuffer data, int version) { this.id = id; - this.writtenAt = writtenAt; + this.writtenAt = UUIDGen.unixTimestamp(id); this.data = data; this.version = version; } @@ -366,7 +367,7 @@ public class BatchlogManager implements BatchlogManagerMBean } } - private List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl) + private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl) { List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size()); for (Mutation mutation : mutations) @@ -384,7 +385,7 @@ public class BatchlogManager implements BatchlogManagerMBean * * @return direct delivery handler to wait on or null, if no live nodes found */ - private ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) + private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) { Set<InetAddress> liveEndpoints = new HashSet<>(); String ks = mutation.getKeyspaceName(); @@ -429,9 +430,9 @@ public class BatchlogManager implements BatchlogManagerMBean */ private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T> { - private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>()); + private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>()); - public ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints) + ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints) { super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH); undelivered.addAll(writeEndpoints); @@ -453,17 +454,42 @@ public class BatchlogManager implements BatchlogManagerMBean } } - // force flush + compaction to reclaim space from the replayed batches - private void cleanup() throws ExecutionException, InterruptedException + @SuppressWarnings("deprecation") + private static void convertOldBatchEntries() { - ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); - cfs.forceBlockingFlush(); - Collection<Descriptor> descriptors = new ArrayList<>(); - // expects ALL sstables to be available for compaction, so just use live set... - for (SSTableReader sstr : cfs.getSSTables(SSTableSet.LIVE)) - descriptors.add(sstr.descriptor); - if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. - CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); + logger.debug("Started convertOldBatchEntries"); + + String query = String.format("SELECT id, data, written_at, version FROM %s.%s", + SystemKeyspace.NAME, + SystemKeyspace.LEGACY_BATCHLOG); + UntypedResultSet batches = executeInternalWithPaging(query, DEFAULT_PAGE_SIZE); + int convertedBatches = 0; + for (UntypedResultSet.Row row : batches) + { + UUID id = row.getUUID("id"); + long timestamp = row.getLong("written_at"); + int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; + logger.debug("Converting mutation at " + timestamp); + + UUID newId = id; + if (id.version() != 1 || timestamp != UUIDGen.unixTimestamp(id)) + newId = UUIDGen.getTimeUUID(timestamp, convertedBatches); + ++convertedBatches; + + Mutation addRow = new RowUpdateBuilder(SystemKeyspace.Batches, + FBUtilities.timestampMicros(), + newId) + .clustering() + .add("data", row.getBytes("data")) + .add("version", version) + .build(); + + addRow.apply(); + } + if (convertedBatches > 0) + Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking(); + // cleanup will be called after replay + logger.debug("Finished convertOldBatchEntries"); } public static class EndpointFilter @@ -504,9 +530,7 @@ public class BatchlogManager implements BatchlogManagerMBean if (validated.keySet().size() == 1) { // we have only 1 `other` rack - // pick up to two random nodes from there - List<InetAddress> otherRack = validated.get(validated.keySet().iterator().next()); - Collections.shuffle(otherRack); + Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values()); return Lists.newArrayList(Iterables.limit(otherRack, 2)); } @@ -519,7 +543,7 @@ public class BatchlogManager implements BatchlogManagerMBean else { racks = Lists.newArrayList(validated.keySet()); - Collections.shuffle((List) racks); + Collections.shuffle((List<String>) racks); } // grab a random member of up to two racks http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 1f3c7db..255f9a0 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2054,6 +2054,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return count > 0 ? (int) (sum / count) : 0; } + public double getMeanPartitionSize() + { + long sum = 0; + long count = 0; + for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) + { + long n = sstable.getEstimatedPartitionSize().count(); + sum += sstable.getEstimatedPartitionSize().mean() * n; + count += n; + } + return count > 0 ? sum * 1.0 / count : 0; + } + public long estimateKeys() { long n = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index a950e17..2db0ce9 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -342,7 +342,7 @@ public class Memtable implements Comparable<Memtable> + liveDataSize.get()) // data * 1.2); // bloom filter and row index overhead - this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); + this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); } public long getExpectedWriteSize() http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 2d0ca24..bc0be65 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -42,6 +42,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -88,7 +89,7 @@ public final class SystemKeyspace public static final String NAME = "system"; public static final String HINTS = "hints"; - public static final String BATCHLOG = "batchlog"; + public static final String BATCHES = "batches"; public static final String PAXOS = "paxos"; public static final String BUILT_INDEXES = "IndexInfo"; public static final String LOCAL = "local"; @@ -102,6 +103,7 @@ public final class SystemKeyspace public static final String MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS = "materialized_views_builds_in_progress"; public static final String BUILT_MATERIALIZED_VIEWS = "built_materialized_views"; + @Deprecated public static final String LEGACY_BATCHLOG = "batchlog"; @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces"; @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies"; @Deprecated public static final String LEGACY_COLUMNS = "schema_columns"; @@ -123,15 +125,15 @@ public final class SystemKeyspace .compaction(CompactionParams.scts(singletonMap("enabled", "false"))) .gcGraceSeconds(0); - public static final CFMetaData Batchlog = - compile(BATCHLOG, + public static final CFMetaData Batches = + compile(BATCHES, "batches awaiting replay", "CREATE TABLE %s (" - + "id uuid," + + "id timeuuid," + "data blob," + "version int," - + "written_at timestamp," + "PRIMARY KEY ((id)))") + .copy(new LocalPartitioner(TimeUUIDType.instance)) .compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) .gcGraceSeconds(0); @@ -280,6 +282,19 @@ public final class SystemKeyspace + "PRIMARY KEY ((keyspace_name), view_name))"); @Deprecated + public static final CFMetaData LegacyBatchlog = + compile(LEGACY_BATCHLOG, + "*DEPRECATED* batchlog entries", + "CREATE TABLE %s (" + + "id uuid," + + "data blob," + + "version int," + + "written_at timestamp," + + "PRIMARY KEY ((id)))") + .compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) + .gcGraceSeconds(0); + + @Deprecated public static final CFMetaData LegacyKeyspaces = compile(LEGACY_KEYSPACES, "*DEPRECATED* keyspace definitions", @@ -409,7 +424,7 @@ public final class SystemKeyspace { return Tables.of(BuiltIndexes, Hints, - Batchlog, + Batches, Paxos, Local, Peers, @@ -421,6 +436,7 @@ public final class SystemKeyspace AvailableRanges, MaterializedViewsBuildsInProgress, BuiltMaterializedViews, + LegacyBatchlog, LegacyKeyspaces, LegacyColumnfamilies, LegacyColumns, http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/dht/LocalPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java index 2a5a16e..f9421c5 100644 --- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java +++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java @@ -66,9 +66,37 @@ public class LocalPartitioner implements IPartitioner public Token.TokenFactory getTokenFactory() { - throw new UnsupportedOperationException(); + return tokenFactory; } + private final Token.TokenFactory tokenFactory = new Token.TokenFactory() + { + public ByteBuffer toByteArray(Token token) + { + return ((LocalToken)token).token; + } + + public Token fromByteArray(ByteBuffer bytes) + { + return new LocalToken(bytes); + } + + public String toString(Token token) + { + return comparator.getString(((LocalToken)token).token); + } + + public void validate(String token) + { + comparator.validate(comparator.fromString(token)); + } + + public Token fromString(String string) + { + return new LocalToken(comparator.fromString(string)); + } + }; + public boolean preservesOrder() { return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 51aa48f..b637b17 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -863,7 +863,7 @@ public class StorageProxy implements StorageProxyMBean null, WriteType.SIMPLE); Mutation mutation = new Mutation( - PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, + PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches, UUIDType.instance.decompose(uuid), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java index 5f1523e..fbb7a5b 100644 --- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java +++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java @@ -17,58 +17,74 @@ */ package org.apache.cassandra.db; -import java.net.InetAddress; -import java.util.Collections; -import java.util.Iterator; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.partitions.ArrayBackedPartition; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -import org.junit.BeforeClass; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ExecutionException; +import com.google.common.collect.Lists; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.Util.PartitionerSwitcher; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.partitions.ArrayBackedPartition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; - public class BatchlogManagerTest { private static final String KEYSPACE1 = "BatchlogManagerTest1"; private static final String CF_STANDARD1 = "Standard1"; private static final String CF_STANDARD2 = "Standard2"; private static final String CF_STANDARD3 = "Standard3"; + private static final String CF_STANDARD4 = "Standard4"; + + static PartitionerSwitcher sw; @BeforeClass public static void defineSchema() throws ConfigurationException { + sw = Util.switchPartitioner(Murmur3Partitioner.instance); SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3)); + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance)); System.out.println(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata.partitionKeyColumns()); } + @AfterClass + public static void cleanup() + { + sw.close(); + } + @Before public void setUp() throws Exception { @@ -76,6 +92,8 @@ public class BatchlogManagerTest InetAddress localhost = InetAddress.getByName("127.0.0.1"); metadata.updateNormalToken(Util.token("A"), localhost); metadata.updateHostId(UUIDGen.getTimeUUID(), localhost); + Schema.instance.getColumnFamilyStoreInstance(SystemKeyspace.Batches.cfId).truncateBlocking(); + Schema.instance.getColumnFamilyStoreInstance(SystemKeyspace.LegacyBatchlog.cfId).truncateBlocking(); } @Test @@ -122,18 +140,17 @@ public class BatchlogManagerTest .build(); long timestamp = i < 500 - ? (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000 - : Long.MAX_VALUE; - - Mutation m2 = BatchlogManager.getBatchlogMutationFor(Collections.singleton(m), - UUIDGen.getTimeUUID(), - MessagingService.current_version, - timestamp); - m2.applyUnsafe(); + ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout()) + : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout()); + + BatchlogManager.getBatchlogMutationFor(Collections.singleton(m), + UUIDGen.getTimeUUID(timestamp, i), + MessagingService.current_version) + .applyUnsafe(); } // Flush the batchlog to disk (see CASSANDRA-6822). - Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG).forceBlockingFlush(); + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(); assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches); assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); @@ -165,25 +182,29 @@ public class BatchlogManagerTest assertEquals(500, result.one().getLong("count")); } - /* @Test public void testTruncatedReplay() throws InterruptedException, ExecutionException { - CellNameType comparator2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2").metadata.comparator; - CellNameType comparator3 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard3").metadata.comparator; + CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2); + CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3); // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog. // Each batchlog entry with a mutation for Standard2 and Standard3. // In the middle of the process, 'truncate' Standard2. for (int i = 0; i < 1000; i++) { - Mutation mutation1 = new Mutation(KEYSPACE1, bytes(i)); - mutation1.add("Standard2", comparator2.makeCellName(bytes(i)), bytes(i), 0); - Mutation mutation2 = new Mutation(KEYSPACE1, bytes(i)); - mutation2.add("Standard3", comparator3.makeCellName(bytes(i)), bytes(i), 0); + Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2); // Make sure it's ready to be replayed, so adjust the timestamp. - long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2; + long timestamp = System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout(); if (i == 500) SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"), @@ -197,14 +218,13 @@ public class BatchlogManagerTest timestamp--; BatchlogManager.getBatchlogMutationFor(mutations, - UUIDGen.getTimeUUID(), - MessagingService.current_version, - timestamp * 1000) + UUIDGen.getTimeUUID(timestamp, i), + MessagingService.current_version) .applyUnsafe(); } // Flush the batchlog to disk (see CASSANDRA-6822). - Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG).forceFlush(); + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(); // Force batchlog replay and wait for it to complete. BatchlogManager.instance.startBatchlogReplay().get(); @@ -216,8 +236,8 @@ public class BatchlogManagerTest if (i >= 500) { assertEquals(bytes(i), result.one().getBytes("key")); - assertEquals(bytes(i), result.one().getBytes("column1")); - assertEquals(bytes(i), result.one().getBytes("value")); + assertEquals("name" + i, result.one().getString("name")); + assertEquals("val" + i, result.one().getString("val")); } else { @@ -229,9 +249,143 @@ public class BatchlogManagerTest { UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i)); assertEquals(bytes(i), result.one().getBytes("key")); - assertEquals(bytes(i), result.one().getBytes("column1")); - assertEquals(bytes(i), result.one().getBytes("value")); + assertEquals("name" + i, result.one().getString("name")); + assertEquals("val" + i, result.one().getString("val")); } } - */ + + static Mutation fakeVersion12MutationFor(Collection<Mutation> mutations, long now) throws IOException + { + // Serialization can't write version 1.2 mutations, pretend this is old by using random id and written_at and + // saving it in the legacy batchlog. + UUID uuid = UUID.randomUUID(); + ByteBuffer writtenAt = LongType.instance.decompose(now); + int version = MessagingService.VERSION_30; + ByteBuffer data = BatchlogManager.serializeMutations(mutations, version); + + return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid) + .clustering() + .add("written_at", writtenAt) + .add("data", data) + .add("version", version) + .build(); + } + + static Mutation fakeVersion20MutationFor(Collection<Mutation> mutations, UUID uuid) + { + // Serialization can't write version 1.2 mutations, pretend this is old by saving it in the legacy batchlog. + int version = MessagingService.VERSION_30; + ByteBuffer writtenAt = LongType.instance.decompose(UUIDGen.unixTimestamp(uuid)); + return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid) + .clustering() + .add("data", BatchlogManager.serializeMutations(mutations, version)) + .add("written_at", writtenAt) + .add("version", version) + .build(); + } + + @Test + public void testConversion() throws Exception + { + long initialAllBatches = BatchlogManager.instance.countAllBatches(); + long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed(); + CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4); + + // Generate 1000 mutations and put them all into the batchlog. + // Half (500) ready to be replayed, half not. + for (int i = 0; i < 1000; i++) + { + Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + + long timestamp = i < 500 + ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout()) + : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout()); + + + fakeVersion12MutationFor(Collections.singleton(mutation), timestamp).applyUnsafe(); + } + + // Add 400 version 2.0 mutations and put them all into the batchlog. + // Half (200) ready to be replayed, half not. + for (int i = 1000; i < 1400; i++) + { + Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + + long timestamp = i < 1200 + ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout()) + : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout()); + + + fakeVersion20MutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(timestamp, i)).applyUnsafe(); + } + + // Mix in 100 current version mutations, 50 ready for replay. + for (int i = 1400; i < 1500; i++) + { + Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + + long timestamp = i < 1450 + ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout()) + : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout()); + + + BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), + UUIDGen.getTimeUUID(timestamp, i), + MessagingService.current_version) + .applyUnsafe(); + } + + // Flush the batchlog to disk (see CASSANDRA-6822). + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush(); + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(); + + assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches); + assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); + + UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG)); + assertEquals("Count in blog legacy", 1400, result.one().getLong("count")); + result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES)); + assertEquals("Count in blog", 100, result.one().getLong("count")); + + // Force batchlog replay and wait for it to complete. + BatchlogManager.instance.performInitialReplay(); + + // Ensure that the first half, and only the first half, got replayed. + assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches); + assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); + + for (int i = 0; i < 1500; i++) + { + result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i)); + if (i < 500 || i >= 1000 && i < 1200 || i >= 1400 && i < 1450) + { + assertEquals(bytes(i), result.one().getBytes("key")); + assertEquals("name" + i, result.one().getString("name")); + assertEquals("val" + i, result.one().getString("val")); + } + else + { + assertTrue("Present at " + i, result.isEmpty()); + } + } + + // Ensure that no stray mutations got somehow applied. + result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4)); + assertEquals(750, result.one().getLong("count")); + + // Ensure batchlog is left as expected. + result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES)); + assertEquals("Count in blog after initial replay", 750, result.one().getLong("count")); + result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG)); + assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count")); + } }
