[
https://issues.apache.org/jira/browse/CASSANDRA-1896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brandon Williams updated CASSANDRA-1896:
----------------------------------------
Attachment: 1896.txt
Patchified, redundant logic removed.
> Improve throughput by adding buffering to the inter-node TCP communication
> --------------------------------------------------------------------------
>
> Key: CASSANDRA-1896
> URL: https://issues.apache.org/jira/browse/CASSANDRA-1896
> Project: Cassandra
> Issue Type: Improvement
> Components: Core
> Reporter: Tsiki
> Assignee: Brandon Williams
> Attachments: 1896.txt
>
>
> The inbound and outbound TCP implementation under org.apache.cassandra.net
> does not buffer the socket streams. A simple change in IncomingTcpConnection
> and OutboundTcpConnection may give a rather big throughput increase. In my
> tests, I got up t o 30% more out of my cluster. Below is the diff of these
> two files with buffering included. The diff is over release 0.6.5 but can be
> quite simply applied also to 0.7. I suggest perhaps to limit the buffered
> input stream I added in IncomingTcpConnection to 4K. The Outbound
> implementation can surely be implemented a bit better (remove some of the
> code I duplicated there).
> diff -r
> apache-cassandra-0.6.5-src/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
>
> fix/apache-cassandra-0.6.5-src/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
> 44c44
> < input = new DataInputStream(new
> BufferedInputStream(socket.getInputStream()));
> ---
> > input = new DataInputStream(socket.getInputStream());
> diff -r
> apache-cassandra-0.6.5-src/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
>
> fix/apache-cassandra-0.6.5-src/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
> 77d76
> < byte[] buf = new byte[4096];
> 80,112c79,89
> < int l = 0;
> < ByteBuffer bb;
> < while ((bb = queue.peek()) != null && l+bb.limit() <
> buf.length) {
> < bb = take();
> < System.arraycopy(bb.array(), 0, buf, l, bb.limit());
> < l += bb.limit();
> < }
> < if (l == 0) {
> < bb = take();
> < if (bb == CLOSE_SENTINEL)
> < {
> < disconnect();
> < continue;
> < }
> < if (socket != null || connect())
> < writeConnected(bb);
> < else
> < // clear out the queue, else gossip messages back up.
> < queue.clear();
> < } else {
> < if (socket != null || connect()) {
> < try {
> < output.write(buf, 0, l);
> < if (queue.peek() == null)
> < output.flush();
> < } catch (IOException e) {
> < logger.info("error writing to " + endpoint);
> < disconnect();
> < }
> < } else {
> < queue.clear();
> < }
> < }
> ---
> > ByteBuffer bb = take();
> > if (bb == CLOSE_SENTINEL)
> > {
> > disconnect();
> > continue;
> > }
> > if (socket != null || connect())
> > writeConnected(bb);
> > else
> > // clear out the queue, else gossip messages back up.
> > queue.clear();
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.