Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 8a5365ebd -> c6efd35cf
Add server side batching to native transport Patch by bes; reviewed by tjake for CASSANDRA-5663 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6efd35c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6efd35c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6efd35c Branch: refs/heads/cassandra-2.1 Commit: c6efd35cf20db59c917b590100a7d09555fa4854 Parents: 8a5365e Author: Jake Luciani <[email protected]> Authored: Wed May 14 13:10:32 2014 -0400 Committer: Jake Luciani <[email protected]> Committed: Wed May 14 13:10:32 2014 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/transport/Message.java | 117 +++++++++++++++++-- 2 files changed, 110 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6efd35c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5fd5a8b..3dd47a1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * Optimize netty server (CASSANDRA-6861) * Fix repair hang when given CF does not exist (CASSANDRA-7189) * Allow c* to be shutdown in an embedded mode (CASSANDRA-5635) + * Add server side batching to native transport (CASSANDRA-5663) Merged from 2.0: * (Hadoop) Close java driver Cluster in CQLRR.close (CASSANDRA-7228) * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6efd35c/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index e39e02c..0ad4312 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -17,9 +17,16 @@ */ package org.apache.cassandra.transport; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; import io.netty.channel.*; @@ -302,6 +309,87 @@ public abstract class Message @ChannelHandler.Sharable public static class Dispatcher extends SimpleChannelInboundHandler<Request> { + private static class FlushItem + { + final ChannelHandlerContext ctx; + final Response response; + private FlushItem(ChannelHandlerContext ctx, Response response) + { + this.ctx = ctx; + this.response = response; + } + } + + private final class Flusher implements Runnable + { + final EventLoop eventLoop; + final ConcurrentLinkedQueue<FlushItem> queued = new ConcurrentLinkedQueue<>(); + final AtomicBoolean running = new AtomicBoolean(false); + final HashSet<ChannelHandlerContext> channels = new HashSet<>(); + final List<FlushItem> flushed = new ArrayList<>(); + int runsSinceFlush = 0; + int runsWithNoWork = 0; + private Flusher(EventLoop eventLoop) + { + this.eventLoop = eventLoop; + } + void start() + { + if (running.compareAndSet(false, true)) + { + this.eventLoop.execute(this); + } + } + public void run() + { + + boolean doneWork = false; + FlushItem flush; + while ( null != (flush = queued.poll()) ) + { + channels.add(flush.ctx); + flush.ctx.write(flush.response, flush.ctx.voidPromise()); + flushed.add(flush); + doneWork = true; + } + + runsSinceFlush++; + + if (!doneWork || runsSinceFlush > 2 || flushed.size() > 50) + { + for (ChannelHandlerContext channel : channels) + channel.flush(); + for (FlushItem item : flushed) + { + if (item.response.getSourceFrame().body.refCnt() > 0) + item.response.getSourceFrame().release(); + } + channels.clear(); + flushed.clear(); + runsSinceFlush = 0; + } + + if (doneWork) + { + runsWithNoWork = 0; + } + else + { + // either reschedule or cancel + if (++runsWithNoWork > 5) + { + running.set(false); + if (queued.isEmpty() || !running.compareAndSet(false, true)) + return; + } + } + + eventLoop.schedule(this, 10000, TimeUnit.NANOSECONDS); + } + } + + private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>(); + public Dispatcher() { super(false); @@ -310,32 +398,45 @@ public abstract class Message @Override public void channelRead0(ChannelHandlerContext ctx, Request request) { + + final Response response; + final ServerConnection connection; + try { assert request.connection() instanceof ServerConnection; - ServerConnection connection = (ServerConnection)request.connection(); + connection = (ServerConnection)request.connection(); QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); logger.debug("Received: {}, v={}", request, connection.getVersion()); - Response response = request.execute(qstate); + response = request.execute(qstate); response.setStreamId(request.getStreamId()); response.attach(connection); response.setSourceFrame(request.getSourceFrame()); connection.applyStateTransition(request.type, response.type); - - logger.debug("Responding: {}, v={}", response, connection.getVersion()); - - ctx.writeAndFlush(response, ctx.voidPromise()); } catch (Throwable ex) { + request.getSourceFrame().release(); // Don't let the exception propagate to exceptionCaught() if we can help it so that we can assign the right streamID. ctx.writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()), ctx.voidPromise()); + return; } - finally { - request.getSourceFrame().release(); + + logger.debug("Responding: {}, v={}", response, connection.getVersion()); + + EventLoop loop = ctx.channel().eventLoop(); + Flusher flusher = flusherLookup.get(loop); + if (flusher == null) + { + Flusher alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop)); + if (alt != null) + flusher = alt; } + + flusher.queued.add(new FlushItem(ctx, response)); + flusher.start(); } @Override
