Updated Branches: refs/heads/cassandra-2.0 20c2adc87 -> 41ffca128 refs/heads/trunk 025474177 -> 9c9552aea
Fix NPE when streaming connection is not yet ready patch by yukim; reviewed by Russell Spitzer for CASSANDRA-6210 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/41ffca12 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/41ffca12 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/41ffca12 Branch: refs/heads/cassandra-2.0 Commit: 41ffca1281dcdc69b1b843b47a5bb6dc3c462aac Parents: 20c2adc Author: Yuki Morishita <yu...@apache.org> Authored: Tue Jan 28 09:17:30 2014 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Jan 28 09:17:30 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/ConnectionHandler.java | 117 +++++++++---------- .../streaming/messages/StreamMessage.java | 3 +- 3 files changed, 61 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 68727dc..46b14fc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ * Fix LOCAL_SERIAL from thrift (CASSANDRA-6584) * Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595) * Add support for 2.1 global counter shards (CASSANDRA-6505) + * Fix NPE when streaming connection is not yet established (CASSANDRA-6210) Merged from 1.2: * fsync compression metadata (CASSANDRA-6531) * Validate CF existence on execution for prepared statement (CASSANDRA-6535) http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 57f76a7..356138b 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -64,6 +65,8 @@ public class ConnectionHandler ConnectionHandler(StreamSession session) { this.session = session; + this.incoming = new IncomingMessageHandler(session); + this.outgoing = new OutgoingMessageHandler(session); } /** @@ -77,15 +80,13 @@ public class ConnectionHandler { logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId()); Socket incomingSocket = connect(session.peer); - incoming = new IncomingMessageHandler(session, incomingSocket, StreamMessage.CURRENT_VERSION); - incoming.sendInitMessage(true); - incoming.start(); + incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION); + incoming.sendInitMessage(incomingSocket, true); logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId()); Socket outgoingSocket = connect(session.peer); - outgoing = new OutgoingMessageHandler(session, outgoingSocket, StreamMessage.CURRENT_VERSION); - outgoing.sendInitMessage(false); - outgoing.start(); + outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION); + outgoing.sendInitMessage(outgoingSocket, false); } /** @@ -98,15 +99,9 @@ public class ConnectionHandler public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException { if (isForOutgoing) - { - outgoing = new OutgoingMessageHandler(session, socket, version); - outgoing.start(); - } + outgoing.start(socket, version); else - { - incoming = new IncomingMessageHandler(session, socket, version); - incoming.start(); - } + incoming.start(socket, version); } /** @@ -189,21 +184,19 @@ public class ConnectionHandler { protected final StreamSession session; - protected final Socket socket; - protected final int protocolVersion; + protected int protocolVersion; + protected Socket socket; private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>(); - protected MessageHandler(StreamSession session, Socket socket, int protocolVersion) + protected MessageHandler(StreamSession session) { this.session = session; - this.socket = socket; - this.protocolVersion = protocolVersion; } protected abstract String name(); - protected WritableByteChannel getWriteChannel() throws IOException + protected static WritableByteChannel getWriteChannel(Socket socket) throws IOException { WritableByteChannel out = socket.getChannel(); // socket channel is null when encrypted(SSL) @@ -212,7 +205,7 @@ public class ConnectionHandler : out; } - protected ReadableByteChannel getReadChannel() throws IOException + protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException { ReadableByteChannel in = socket.getChannel(); // socket channel is null when encrypted(SSL) @@ -221,14 +214,19 @@ public class ConnectionHandler : in; } - public void sendInitMessage(boolean isForOutgoing) throws IOException + public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException { StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), session.description(), isForOutgoing); - getWriteChannel().write(message.createMessage(false, protocolVersion)); + ByteBuffer messageBuf = message.createMessage(false, protocolVersion); + while (messageBuf.hasRemaining()) + getWriteChannel(socket).write(messageBuf); } - public void start() + public void start(Socket socket, int protocolVersion) { + this.socket = socket; + this.protocolVersion = protocolVersion; + new Thread(this, name() + "-" + session.peer).start(); } @@ -264,12 +262,9 @@ public class ConnectionHandler */ static class IncomingMessageHandler extends MessageHandler { - private final ReadableByteChannel in; - - IncomingMessageHandler(StreamSession session, Socket socket, int protocolVersion) throws IOException + IncomingMessageHandler(StreamSession session) { - super(session, socket, protocolVersion); - this.in = getReadChannel(); + super(session); } protected String name() @@ -279,9 +274,10 @@ public class ConnectionHandler public void run() { - while (!isClosed()) + try { - try + ReadableByteChannel in = getReadChannel(socket); + while (!isClosed()) { // receive message StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session); @@ -293,17 +289,20 @@ public class ConnectionHandler session.messageReceived(message); } } - catch (SocketException e) - { - // socket is closed - close(); - } - catch (Throwable e) - { - session.onError(e); - } } - signalCloseDone(); + catch (SocketException e) + { + // socket is closed + close(); + } + catch (Throwable e) + { + session.onError(e); + } + finally + { + signalCloseDone(); + } } } @@ -326,12 +325,9 @@ public class ConnectionHandler } }); - private final WritableByteChannel out; - - OutgoingMessageHandler(StreamSession session, Socket socket, int protocolVersion) throws IOException + OutgoingMessageHandler(StreamSession session) { - super(session, socket, protocolVersion); - this.out = getWriteChannel(); + super(session); } protected String name() @@ -346,30 +342,33 @@ public class ConnectionHandler public void run() { - StreamMessage next; - while (!isClosed()) + try { - try + WritableByteChannel out = getWriteChannel(socket); + + StreamMessage next; + while (!isClosed()) { if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != null) { logger.debug("[Stream #{}] Sending {}", session.planId(), next); - sendMessage(next); + sendMessage(out, next); if (next.type == StreamMessage.Type.SESSION_FAILED) close(); } } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - } - try - { // Sends the last messages on the queue while ((next = messageQueue.poll()) != null) - sendMessage(next); + sendMessage(out, next); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + catch (IOException e) + { + session.onError(e); } finally { @@ -377,7 +376,7 @@ public class ConnectionHandler } } - private void sendMessage(StreamMessage message) + private void sendMessage(WritableByteChannel out, StreamMessage message) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index 2e7341b..9e146e8 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -40,7 +40,8 @@ public abstract class StreamMessage // message type buff.put(message.type.type); buff.flip(); - out.write(buff); + while (buff.hasRemaining()) + out.write(buff); message.type.serializer.serialize(message, out, version, session); }