Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 e7b3deee6 -> 92c38c0e6
Make batchlog replay asynchronous patch by Oleg Anastasyev; reviewed by Aleksey Yeschenko for CASSANDRA-6134 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/92c38c0e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/92c38c0e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/92c38c0e Branch: refs/heads/cassandra-2.1 Commit: 92c38c0e6a5e23bdb77c23073a28f118a9f23add Parents: e7b3dee Author: Aleksey Yeschenko <[email protected]> Authored: Thu May 15 01:13:09 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Thu May 15 01:13:09 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/BatchlogManager.java | 287 ++++++++++++------- 2 files changed, 188 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/92c38c0e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3dd47a1..d43a0f5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ * Fix repair hang when given CF does not exist (CASSANDRA-7189) * Allow c* to be shutdown in an embedded mode (CASSANDRA-5635) * Add server side batching to native transport (CASSANDRA-5663) + * Make batchlog replay asynchronous (CASSANDRA-6134) Merged from 2.0: * (Hadoop) Close java driver Cluster in CQLRR.close (CASSANDRA-7228) * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067) http://git-wip-us.apache.org/repos/asf/cassandra/blob/92c38c0e/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 3ffc7a7..1a441f6 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -48,6 +48,8 @@ import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; @@ -193,162 +195,247 @@ public class BatchlogManager implements BatchlogManagerMBean logger.debug("Finished replayAllFailedBatches"); } - // returns the UUID of the last seen batch + private void deleteBatch(UUID id) + { + Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id)); + mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros()); + mutation.apply(); + } + private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter) { UUID id = null; + ArrayList<Batch> batches = new ArrayList<>(page.size()); + + // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others for (UntypedResultSet.Row row : page) { id = row.getUUID("id"); long writtenAt = row.getLong("written_at"); - int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; // enough time for the actual write + batchlog entry mutation delivery (two separate requests). long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation if (System.currentTimeMillis() < writtenAt + timeout) continue; // not ready to replay yet, might still get a deletion. - replayBatch(id, row.getBytes("data"), writtenAt, version, rateLimiter); + + int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; + Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version); + try + { + if (batch.replay(rateLimiter) > 0) + { + batches.add(batch); + } + else + { + deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated). + totalBatchesReplayed.incrementAndGet(); + } + } + catch (IOException e) + { + logger.warn("Skipped batch replay of {} due to {}", id, e); + deleteBatch(id); + } + } + + // now waiting for all batches to complete their processing + // schedule hints for timed out deliveries + for (Batch batch : batches) + { + batch.finish(); + deleteBatch(batch.id); } + + totalBatchesReplayed.addAndGet(batches.size()); + return id; } - private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) + private static class Batch { - logger.debug("Replaying batch {}", id); + private final UUID id; + private final long writtenAt; + private final ByteBuffer data; + private final int version; - try + private List<ReplayWriteResponseHandler> replayHandlers; + + public Batch(UUID id, long writtenAt, ByteBuffer data, int version) { - replaySerializedMutations(data, writtenAt, version, rateLimiter); + this.id = id; + this.writtenAt = writtenAt; + this.data = data; + this.version = version; } - catch (IOException e) + + public int replay(RateLimiter rateLimiter) throws IOException { - logger.warn("Skipped batch replay of {} due to {}", id, e); - } + logger.debug("Replaying batch {}", id); - deleteBatch(id); + List<Mutation> mutations = replayingMutations(); - totalBatchesReplayed.incrementAndGet(); - } + if (mutations.isEmpty()) + return 0; - private void deleteBatch(UUID id) - { - Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id)); - mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros()); - mutation.apply(); - } + int ttl = calculateHintTTL(mutations); + if (ttl <= 0) + return 0; - private void replaySerializedMutations(ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) throws IOException - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); - int size = in.readInt(); - List<Mutation> mutations = new ArrayList<>(size); + replayHandlers = sendReplays(mutations, writtenAt, ttl); + + rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation. - for (int i = 0; i < size; i++) + return replayHandlers.size(); + } + + public void finish() { - Mutation mutation = Mutation.serializer.deserialize(in, version); + for (int i = 0; i < replayHandlers.size(); i++) + { + ReplayWriteResponseHandler handler = replayHandlers.get(i); + try + { + handler.get(); + } + catch (WriteTimeoutException e) + { + logger.debug("Timed out replaying a batched mutation to a node, will write a hint"); + // writing hints for the rest to hints, starting from i + writeHintsForUndeliveredEndpoints(i); + return; + } + } + } + + private List<Mutation> replayingMutations() throws IOException + { + DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); + int size = in.readInt(); + List<Mutation> mutations = new ArrayList<>(size); + for (int i = 0; i < size; i++) + { + Mutation mutation = Mutation.serializer.deserialize(in, version); - // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. - // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then - // truncated. - for (UUID cfId : mutation.getColumnFamilyIds()) - if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) - mutation = mutation.without(cfId); + // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. + // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then + // truncated. + for (UUID cfId : mutation.getColumnFamilyIds()) + if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) + mutation = mutation.without(cfId); - if (!mutation.isEmpty()) - mutations.add(mutation); + if (!mutation.isEmpty()) + mutations.add(mutation); + } + return mutations; } - if (!mutations.isEmpty()) - replayMutations(mutations, writtenAt, version, rateLimiter); - } + private void writeHintsForUndeliveredEndpoints(int startFrom) + { + try + { + // Here we deserialize mutations 2nd time from byte buffer. + // but this is ok, because timeout on batch direct delivery is rare + // (it can happen only several seconds until node is marked dead) + // so trading some cpu to keep less objects + List<Mutation> replayingMutations = replayingMutations(); + for (int i = startFrom; i < replayHandlers.size(); i++) + { + Mutation undeliveredMutation = replayingMutations.get(i); + int ttl = calculateHintTTL(replayingMutations); + ReplayWriteResponseHandler handler = replayHandlers.get(i); - /* - * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints - * when a replica is down or a write request times out. - */ - private void replayMutations(List<Mutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException - { - int ttl = calculateHintTTL(mutations, writtenAt); - if (ttl <= 0) - return; // this batchlog entry has 'expired' - - List<InetAddress> liveEndpoints = new ArrayList<>(); - List<InetAddress> hintEndpoints = new ArrayList<>(); - - for (Mutation mutation : mutations) + if (ttl > 0 && handler != null) + for (InetAddress endpoint : handler.undelivered) + StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint); + } + } + catch (IOException e) + { + logger.error("Cannot schedule hints for undelivered batch", e); + } + } + + private List<ReplayWriteResponseHandler> sendReplays(List<Mutation> mutations, long writtenAt, int ttl) { + List<ReplayWriteResponseHandler> handlers = new ArrayList<>(mutations.size()); + for (Mutation mutation : mutations) + { + ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl); + if (handler != null) + handlers.add(handler); + } + return handlers; + } + + /** + * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints + * when a replica is down or a write request times out. + * + * @return direct delivery handler to wait on or null, if no live nodes found + */ + private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) + { + Set<InetAddress> liveEndpoints = new HashSet<>(); String ks = mutation.getKeyspaceName(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version); + Token<?> tk = StorageService.getPartitioner().getToken(mutation.key()); for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) { - rateLimiter.acquire(mutationSize); if (endpoint.equals(FBUtilities.getBroadcastAddress())) mutation.apply(); else if (FailureDetector.instance.isAlive(endpoint)) liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. else - hintEndpoints.add(endpoint); + StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint); } - if (!liveEndpoints.isEmpty()) - hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints)); + if (liveEndpoints.isEmpty()) + return null; - for (InetAddress endpoint : hintEndpoints) - StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint); - - liveEndpoints.clear(); - hintEndpoints.clear(); + ReplayWriteResponseHandler handler = new ReplayWriteResponseHandler(liveEndpoints); + MessageOut<Mutation> message = mutation.createMessage(); + for (InetAddress endpoint : liveEndpoints) + MessagingService.instance().sendRR(message, endpoint, handler, false); + return handler; } - } - // Returns the endpoints we failed to deliver to. - private Set<InetAddress> attemptDirectDelivery(Mutation mutation, List<InetAddress> endpoints) throws IOException - { - final List<WriteResponseHandler> handlers = new ArrayList<>(); - final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>()); - - for (final InetAddress ep : endpoints) + /* + * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). + * This ensures that deletes aren't "undone" by an old batch replay. + */ + private int calculateHintTTL(Collection<Mutation> mutations) { - Runnable callback = new Runnable() - { - public void run() - { - undelivered.remove(ep); - } - }; - WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback); - MessagingService.instance().sendRR(mutation.createMessage(), ep, handler, false); - handlers.add(handler); + int unadjustedTTL = Integer.MAX_VALUE; + for (Mutation mutation : mutations) + unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); + return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt); } - // Wait for all the requests to complete. - for (WriteResponseHandler handler : handlers) + private static class ReplayWriteResponseHandler extends WriteResponseHandler { - try + private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>()); + + public ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints) { - handler.get(); + super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH); + undelivered.addAll(writeEndpoints); } - catch (WriteTimeoutException e) + + @Override + protected int totalBlockFor() { - logger.debug("Timed out replaying a batched mutation to a node, will write a hint"); + return this.naturalEndpoints.size(); } - } - return undelivered; - } - - /* - * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). - * This ensures that deletes aren't "undone" by an old batch replay. - */ - private int calculateHintTTL(List<Mutation> mutations, long writtenAt) - { - int unadjustedTTL = Integer.MAX_VALUE; - for (Mutation mutation : mutations) - unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); - return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt); + @Override + public void response(MessageIn m) + { + boolean removed = undelivered.remove(m.from); + assert removed; + super.response(m); + } + } } // force flush + compaction to reclaim space from the replayed batches
