Author: brandonwilliams Date: Thu Jan 6 23:33:45 2011 New Revision: 1056121
URL: http://svn.apache.org/viewvc?rev=1056121&view=rev Log: Don't begin buffering a connection until we've determined the type. Patch by Stu Hood, reviewed by gdusbabek for CASSANDRA-1943 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1056121&r1=1056120&r2=1056121&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Thu Jan 6 23:33:45 2011 @@ -32,40 +32,48 @@ public class IncomingTcpConnection exten { private static Logger logger = Logger.getLogger(IncomingTcpConnection.class); - private final DataInputStream input; private Socket socket; public IncomingTcpConnection(Socket socket) { assert socket != null; this.socket = socket; + } + + /** + * A new connection will either stream or message for its entire lifetime: because streaming + * bypasses the InputStream implementations to use sendFile, we cannot begin buffering until + * we've determined the type of the connection. + */ + @Override + public void run() + { + DataInputStream input; + boolean isStream; try { - input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); + // determine the connection type to decide whether to buffer + input = new DataInputStream(socket.getInputStream()); + MessagingService.validateMagic(input.readInt()); + int header = input.readInt(); + isStream = MessagingService.getBits(header, 3, 1) == 1; + if (!isStream) + // we should buffer + input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); } catch (IOException e) { close(); throw new IOError(e); } - } - - @Override - public void run() - { while (true) { try { - MessagingService.validateMagic(input.readInt()); - int header = input.readInt(); - int type = MessagingService.getBits(header, 1, 2); - boolean isStream = MessagingService.getBits(header, 3, 1) == 1; - int version = MessagingService.getBits(header, 15, 8); - if (isStream) { new IncomingStreamReader(socket.getChannel()).read(); + break; } else { @@ -76,6 +84,10 @@ public class IncomingTcpConnection exten Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes))); MessagingService.receive(message); } + // prepare to read the next message + MessagingService.validateMagic(input.readInt()); + int header = input.readInt(); + assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream; } catch (EOFException e) {
