update MessagingService protocol to allow version handshake with broadcast 
address identification
patch by jbellis; reviewed by brandonwilliams and Marcus Eriksson for 
CASSANDRA-4311


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c82a9d9b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c82a9d9b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c82a9d9b

Branch: refs/heads/trunk
Commit: c82a9d9b96667cc2afeb2609fbff955b4232fc2b
Parents: 15577ba
Author: Jonathan Ellis <[email protected]>
Authored: Wed Jun 6 12:33:10 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Thu Jun 14 18:03:46 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../apache/cassandra/db/commitlog/CommitLog.java   |    1 -
 src/java/org/apache/cassandra/gms/Gossiper.java    |    4 +-
 .../cassandra/net/IncomingTcpConnection.java       |  157 +++++++++-----
 .../org/apache/cassandra/net/MessagingService.java |   22 +--
 .../cassandra/net/OutboundTcpConnection.java       |  122 +++++++-----
 .../cassandra/net/OutboundTcpConnectionPool.java   |   19 ++-
 .../apache/cassandra/net/io/SerializerType.java    |   26 ---
 8 files changed, 197 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b7d4861..2eb1b57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.2-dev
+ * update MS protocol with a version handshake + broadcast address id
+   (CASSANDRA-4311)
  * multithreaded hint replay (CASSANDRA-4189)
  * add inter-node message compression (CASSANDRA-3127)
  * enforce 1m min keycache for auto (CASSANDRA-4306)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 080cd22..6649ae4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.net.MessagingService;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index ecc3453..06dafe7 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -200,10 +200,10 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         subscribers.remove(subscriber);
     }
 
-    public void setVersion(InetAddress address, int version)
+    public Integer setVersion(InetAddress address, int version)
     {
         logger.debug("Setting version {} for {}", version, address);
-        versions.put(address, version);
+        return versions.put(address, version);
     }
 
     public void resetVersion(InetAddress endpoint)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 620fbdb..e808c7e 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -17,15 +17,13 @@
  */
 package org.apache.cassandra.net;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
+import java.io.*;
 import java.net.InetAddress;
 import java.net.Socket;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.xerial.snappy.SnappyInputStream;
 
 import org.apache.cassandra.gms.Gossiper;
@@ -57,61 +55,19 @@ public class IncomingTcpConnection extends Thread
         try
         {
             // determine the connection type to decide whether to buffer
-            DataInputStream input = new 
DataInputStream(socket.getInputStream());
-            MessagingService.validateMagic(input.readInt());
-            int header = input.readInt();
-            boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
+            DataInputStream in = new DataInputStream(socket.getInputStream());
+            MessagingService.validateMagic(in.readInt());
+            int header = in.readInt();
             boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
             int version = MessagingService.getBits(header, 15, 8);
+            logger.debug("Connection version {} from {}", version, 
socket.getInetAddress());
+
             if (isStream)
-            {
-                if (version == MessagingService.current_version)
-                {
-                    int size = input.readInt();
-                    byte[] headerBytes = new byte[size];
-                    input.readFully(headerBytes);
-                    stream(StreamHeader.serializer.deserialize(new 
DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input);
-                }
-                else
-                {
-                    // streaming connections are per-session and have a fixed 
version.  we can't do anything with a wrong-version stream connection, so drop 
it.
-                    logger.error("Received stream using protocol version {} 
(my version {}). Terminating connection",
-                                 version, MessagingService.current_version);
-                }
-                // We are done with this connection....
-                return;
-            }
-
-            // we should buffer
-            input = new DataInputStream(new 
BufferedInputStream(socket.getInputStream(), 4096));
-            // Receive the first message to set the version.
-            from = receiveMessage(input, version); // why? see => 
CASSANDRA-4099
-            logger.debug("Version for {} is {}", from, version);
-            if (version > MessagingService.current_version)
-            {
-                // save the endpoint so gossip will reconnect to it
-                Gossiper.instance.addSavedEndpoint(from);
-                logger.info("Received " + (isStream ? "streaming " : "") + 
"connection from newer protocol version. Ignoring");
-                return;
-            }
-            Gossiper.instance.setVersion(from, version);
-            logger.debug("set version for {} to {}", from, version);
-            if (compressed)
-            {
-                logger.debug("Upgrading incoming connection to be compressed");
-                input = new DataInputStream(new SnappyInputStream(input));
-            }
-            // loop to get the next message.
-            while (true)
-            {
-                // prepare to read the next message
-                MessagingService.validateMagic(input.readInt());
-                header = input.readInt();
-                assert isStream == (MessagingService.getBits(header, 3, 1) == 
1) : "Connections cannot change type: " + isStream;
-                version = MessagingService.getBits(header, 15, 8);
-                logger.trace("Version is now {}", version);
-                receiveMessage(input, version);
-            }
+                handleStream(in, version);
+            else if (version < MessagingService.VERSION_12)
+                handleLegacyVersion(version);
+            else
+                handleModernVersion(version, header);
         }
         catch (EOFException e)
         {
@@ -128,6 +84,95 @@ public class IncomingTcpConnection extends Thread
         }
     }
 
+    private void handleModernVersion(int version, int header) throws 
IOException
+    {
+        DataOutputStream out = new DataOutputStream(socket.getOutputStream());
+        out.writeInt(MessagingService.current_version);
+        out.flush();
+
+        DataInputStream in = new DataInputStream(socket.getInputStream());
+        int maxVersion = in.readInt();
+        from = CompactEndpointSerializationHelper.deserialize(in);
+        boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
+
+        if (compressed)
+        {
+            logger.debug("Upgrading incoming connection to be compressed");
+            in = new DataInputStream(new 
SnappyInputStream(socket.getInputStream()));
+        }
+        else
+        {
+            in = new DataInputStream(new 
BufferedInputStream(socket.getInputStream(), 4096));
+        }
+
+        logger.debug("Max version for {} is {}", from, maxVersion);
+        if (version > MessagingService.current_version)
+        {
+            // save the endpoint so gossip will reconnect to it
+            Gossiper.instance.addSavedEndpoint(from);
+            logger.info("Received messages from newer protocol version {}. 
Ignoring", version);
+            return;
+        }
+        Gossiper.instance.setVersion(from, 
Math.min(MessagingService.current_version, maxVersion));
+        logger.debug("set version for {} to {}", from, 
Math.min(MessagingService.current_version, maxVersion));
+        // outbound side will reconnect if necessary to upgrade version
+
+        while (true)
+        {
+            MessagingService.validateMagic(in.readInt());
+            receiveMessage(in, version);
+        }
+    }
+
+    private void handleLegacyVersion(int version) throws IOException
+    {
+        DataInputStream in = new DataInputStream(new 
BufferedInputStream(socket.getInputStream(), 4096));
+
+        from = receiveMessage(in, version); // why? see => CASSANDRA-4099
+        logger.debug("Version for {} is {}", from, version);
+        if (version > MessagingService.current_version)
+        {
+            // save the endpoint so gossip will reconnect to it
+            Gossiper.instance.addSavedEndpoint(from);
+            logger.info("Received messages from newer protocol version. 
Ignoring");
+            return;
+        }
+        int lastVersion = Gossiper.instance.setVersion(from, version);
+        logger.debug("set version for {} to {}", from, version);
+        if (lastVersion < version)
+        {
+            logger.debug("breaking outbound connections to force version 
upgrade");
+            
MessagingService.instance().getConnectionPool(from).resetToNewerVersion(version);
+        }
+
+        while (true)
+        {
+            MessagingService.validateMagic(in.readInt());
+            int header = in.readInt(); // legacy protocol re-sends header for 
each message
+            assert !(MessagingService.getBits(header, 3, 1) == 1) : 
"Non-stream connection cannot change to stream";
+            version = MessagingService.getBits(header, 15, 8);
+            logger.trace("Version is now {}", version);
+            receiveMessage(in, version);
+        }
+    }
+
+    private void handleStream(DataInputStream input, int version) throws 
IOException
+    {
+        if (version == MessagingService.current_version)
+        {
+            int size = input.readInt();
+            byte[] headerBytes = new byte[size];
+            input.readFully(headerBytes);
+            stream(StreamHeader.serializer.deserialize(new DataInputStream(new 
FastByteArrayInputStream(headerBytes)), version), input);
+        }
+        else
+        {
+            // streaming connections are per-session and have a fixed version. 
 we can't do anything with a wrong-version stream connection, so drop it.
+            logger.error("Received stream using protocol version {} (my 
version {}). Terminating connection",
+                         version, MessagingService.current_version);
+        }
+    }
+
     private InetAddress receiveMessage(DataInputStream input, int version) 
throws IOException
     {
         if (version <= MessagingService.VERSION_11)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 6dd954d..ed9e5b3 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -54,7 +54,6 @@ import org.apache.cassandra.gms.GossipDigestSyn;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.ILatencySubscriber;
-import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.AntiEntropyService;
@@ -78,8 +77,6 @@ public final class MessagingService implements 
MessagingServiceMBean
     public static final int VERSION_12 = 5;
     public static final int current_version = VERSION_12;
 
-    static SerializerType serializerType = SerializerType.BINARY;
-
     /** we preface every message with this number so the recipient can 
validate the sender is sane */
     static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
@@ -721,34 +718,21 @@ public final class MessagingService implements 
MessagingServiceMBean
             throw new IOException("invalid protocol header");
     }
 
-    public static int getBits(int x, int p, int n)
+    public static int getBits(int packed, int start, int count)
     {
-        return x >>> (p + 1) - n & ~(-1 << n);
+        return packed >>> (start + 1) - count & ~(-1 << count);
     }
 
     public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean 
compress, int version)
     {
-        /*
-        Setting up the protocol header. This is 4 bytes long
-        represented as an integer. The first 2 bits indicate
-        the serializer type. The 3rd bit indicates if compression
-        is turned on or off. It is turned off by default. The 4th
-        bit indicates if we are in streaming mode. It is turned off
-        by default. The following 4 bits are reserved for future use.
-        The next 8 bits indicate a version number. Remaining 15 bits
-        are not used currently.
-        */
         int header = 0;
-        // Setting up the serializer bit
-        header |= serializerType.ordinal();
         // set compression bit.
-        if ( compress )
+        if (compress)
             header |= 4;
         // set streaming bit
         header |= 8;
         // Setting up the version bit
         header |= (version << 8);
-        /* Finished the protocol header setup */
 
         /* Adding the StreamHeader which contains the session Id along
          * with the pendingfile info for the stream.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 76f496c..a123072 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.net;
 
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -28,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
 import org.xerial.snappy.SnappyOutputStream;
 
 import org.apache.cassandra.config.Config;
@@ -53,21 +57,19 @@ public class OutboundTcpConnection extends Thread
     private Socket socket;
     private volatile long completed;
     private final AtomicLong dropped = new AtomicLong();
-    private boolean isUpgraded = false;
-    private boolean writeToLocalDC;
+    private int targetVersion;
 
     public OutboundTcpConnection(OutboundTcpConnectionPool pool)
     {
         super("WRITE-" + pool.endPoint());
-        this.writeToLocalDC = isSameDC(pool.endPoint());
         this.poolReference = pool;
     }
 
-    private static boolean isSameDC(InetAddress targetHost)
+    private static boolean isLocalDC(InetAddress targetHost)
     {
         String remoteDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
-        String thisDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(DatabaseDescriptor.getRpcAddress());
-        return remoteDC.equals(thisDC);
+        String localDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(DatabaseDescriptor.getRpcAddress());
+        return remoteDC.equals(localDC);
     }
 
     public void enqueue(MessageOut<?> message, String id)
@@ -95,6 +97,11 @@ public class OutboundTcpConnection extends Thread
         enqueue(CLOSE_SENTINEL, null);
     }
 
+    public int getTargetVersion()
+    {
+        return targetVersion;
+    }
+
     public void run()
     {
         while (true)
@@ -149,13 +156,11 @@ public class OutboundTcpConnection extends Thread
         return dropped.get();
     }
 
-    private boolean shouldUpgradeConnection()
+    private boolean shouldCompressConnection()
     {
-        if(Gossiper.instance.getVersion(poolReference.endPoint()) >= 
MessagingService.current_version &&
-                (DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.all ||
-                (DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.dc && !writeToLocalDC)))
-            return true;
-        return false;
+        // assumes version >= 1.2
+        return DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.all
+               || (DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint()));
     }
 
     private void writeConnected(MessageOut<?> message, String id)
@@ -168,13 +173,6 @@ public class OutboundTcpConnection extends Thread
             {
                 out.flush();
             }
-            if(!isUpgraded && shouldUpgradeConnection())
-            {
-                out.flush();
-                logger.debug("Upgrading OutputStream to be compressed");
-                out = new DataOutputStream(new SnappyOutputStream(new 
BufferedOutputStream(socket.getOutputStream())));
-                isUpgraded = true;
-            }
         }
         catch (Exception e)
         {
@@ -189,41 +187,37 @@ public class OutboundTcpConnection extends Thread
 
     public void write(MessageOut<?> message, String id, DataOutputStream out) 
throws IOException
     {
-        write(message, id, out, 
Gossiper.instance.getVersion(poolReference.endPoint()), 
shouldUpgradeConnection());
+        write(message, id, out, targetVersion);
     }
 
-    public static void write(MessageOut message, String id, DataOutputStream 
out, int version, boolean compressionEnabled) throws IOException
+    public static void write(MessageOut message, String id, DataOutputStream 
out, int version) throws IOException
     {
-        /*
-         Setting up the protocol header. This is 4 bytes long
-         represented as an integer. The first 2 bits indicate
-         the serializer type. The 3rd bit indicates if compression
-         is turned on or off. It is turned off by default. The 4th
-         bit indicates if we are in streaming mode. It is turned off
-         by default. The 5th-8th bits are reserved for future use.
-         The next 8 bits indicate a version number. Remaining 15 bits
-         are not used currently.
-        */
-        int header = 0;
-        // Setting up the serializer bit
-        header |= MessagingService.serializerType.ordinal();
-        // set compression bit.
-        if (compressionEnabled)
-            header |= 4;
-        // Setting up the version bit
-        header |= (version << 8);
-
         out.writeInt(MessagingService.PROTOCOL_MAGIC);
-        out.writeInt(header);
-
+        if (version < MessagingService.VERSION_12)
+            writeHeader(out, version, false);
         // 0.8 included a total message size int.  1.0 doesn't need it but 
expects it to be there.
-        if (version <= MessagingService.VERSION_11)
+        if (version <  MessagingService.VERSION_12)
             out.writeInt(-1);
 
         out.writeUTF(id);
         message.serialize(out, version);
     }
 
+    private static void writeHeader(DataOutputStream out, int version, boolean 
compressionEnabled) throws IOException
+    {
+        // 2 bits: unused.  used to be "serializer type," which was always 
Binary
+        // 1 bit: compression
+        // 1 bit: streaming mode
+        // 3 bits: unused
+        // 8 bits: version
+        // 15 bits: unused
+        int header = 0;
+        if (compressionEnabled)
+            header |= 4;
+        header |= (version << 8);
+        out.writeInt(header);
+    }
+
     private void disconnect()
     {
         if (socket != null)
@@ -244,9 +238,11 @@ public class OutboundTcpConnection extends Thread
 
     private boolean connect()
     {
-        isUpgraded = false;
         if (logger.isDebugEnabled())
             logger.debug("attempting to connect to " + 
poolReference.endPoint());
+
+        targetVersion = Gossiper.instance.getVersion(poolReference.endPoint());
+
         long start = System.currentTimeMillis();
         while (System.currentTimeMillis() < start + 
DatabaseDescriptor.getRpcTimeout())
         {
@@ -256,6 +252,41 @@ public class OutboundTcpConnection extends Thread
                 socket.setKeepAlive(true);
                 socket.setTcpNoDelay(true);
                 out = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream(), 4096));
+
+                if (targetVersion >= MessagingService.VERSION_12)
+                {
+                    out.writeInt(MessagingService.PROTOCOL_MAGIC);
+                    writeHeader(out, targetVersion, 
shouldCompressConnection());
+                    out.flush();
+
+                    DataInputStream in = new 
DataInputStream(socket.getInputStream());
+                    int maxTargetVersion = in.readInt();
+                    if (targetVersion > maxTargetVersion)
+                    {
+                        logger.debug("Target max version is {}; will reconnect 
with that version", maxTargetVersion);
+                        Gossiper.instance.setVersion(poolReference.endPoint(), 
maxTargetVersion);
+                        disconnect();
+                        return false;
+                    }
+
+                    if (targetVersion < maxTargetVersion && targetVersion < 
MessagingService.current_version)
+                    {
+                        logger.debug("Detected higher max version {} (using 
{}); will reconnect when queued messages are done",
+                                     maxTargetVersion, targetVersion);
+                        Gossiper.instance.setVersion(poolReference.endPoint(), 
Math.min(MessagingService.current_version, maxTargetVersion));
+                        softCloseSocket();
+                    }
+
+                    out.writeInt(MessagingService.current_version);
+                    
CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), 
out);
+                    if (shouldCompressConnection())
+                    {
+                        out.flush();
+                        logger.debug("Upgrading OutputStream to be 
compressed");
+                        out = new DataOutputStream(new SnappyOutputStream(new 
BufferedOutputStream(socket.getOutputStream())));
+                    }
+                }
+
                 return true;
             }
             catch (IOException e)
@@ -273,9 +304,6 @@ public class OutboundTcpConnection extends Thread
                 }
             }
         }
-        // nodes can change dc during the lifetime of this 
OutboundTcpConnection, but we cannot upgrade a
-        // connection mid-flight, so it is safe to set here and dont change 
until the next connect()
-        this.writeToLocalDC = isSameDC(poolReference.endPoint());
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 45b7627..ce51cf0 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -61,10 +61,19 @@ public class OutboundTcpConnectionPool
                : cmdCon;
     }
 
-    synchronized void reset()
+    void reset()
     {
-        for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, 
ackCon })
-            con.closeSocket();
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
cmdCon, ackCon })
+            conn.closeSocket();
+    }
+
+    public void resetToNewerVersion(int version)
+    {
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
cmdCon, ackCon })
+        {
+            if (version > conn.getTargetVersion())
+                conn.softCloseSocket();
+        }
     }
 
     /**
@@ -75,8 +84,8 @@ public class OutboundTcpConnectionPool
     public void reset(InetAddress remoteEP)
     {
         resetedEndpoint = remoteEP;
-        for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, 
ackCon })
-            con.softCloseSocket();
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
cmdCon, ackCon })
+            conn.softCloseSocket();
     }
 
     public Socket newSocket() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/io/SerializerType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/io/SerializerType.java 
b/src/java/org/apache/cassandra/net/io/SerializerType.java
deleted file mode 100644
index ac5526f..0000000
--- a/src/java/org/apache/cassandra/net/io/SerializerType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net.io;
-
-public enum SerializerType
-{
-    BINARY,
-    JAVA,
-    XML,
-    JSON
-}

Reply via email to