Configurable transport for CFRR/CFRW.
Patch by Piotr Kołaczkowski, reviewed by brandonwilliams for
CASSANDRA-4558


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7db46ef8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7db46ef8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7db46ef8

Branch: refs/heads/trunk
Commit: 7db46ef80acc567a6ac3e4edcfcf39a6b22b73fa
Parents: f5619bb
Author: Brandon Williams <[email protected]>
Authored: Mon Aug 20 15:27:53 2012 -0500
Committer: Brandon Williams <[email protected]>
Committed: Mon Aug 20 15:27:53 2012 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/ColumnFamilyInputFormat.java  |    2 +-
 .../cassandra/hadoop/ColumnFamilyOutputFormat.java |   12 ++-
 .../cassandra/hadoop/ColumnFamilyRecordReader.java |    6 +-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   55 ++++++++++++---
 .../apache/cassandra/thrift/ITransportFactory.java |   36 ++++++++++
 .../cassandra/thrift/TFramedTransportFactory.java  |   37 ++++++++++
 6 files changed, 131 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 354903d..cb79b01 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -252,7 +252,7 @@ public class ColumnFamilyInputFormat extends 
InputFormat<ByteBuffer, SortedMap<B
 
             try
             {
-                Cassandra.Client client = ConfigHelper.createConnection(host, 
ConfigHelper.getInputRpcPort(conf), true);
+                Cassandra.Client client = ConfigHelper.createConnection(conf, 
host, ConfigHelper.getInputRpcPort(conf));
                 client.set_keyspace(keyspace);
                 return client.describe_splits(cfName, range.start_token, 
range.end_token, splitsize);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 668c4aa..e01ada5 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,9 +36,10 @@ import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 
+import javax.security.auth.login.LoginException;
+
 /**
  * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
@@ -149,11 +151,12 @@ public class ColumnFamilyOutputFormat extends 
OutputFormat<ByteBuffer,List<Mutat
      * @throws AuthorizationException
      */
     public static Cassandra.Client createAuthenticatedClient(TSocket socket, 
Configuration conf)
-    throws InvalidRequestException, TException, AuthenticationException, 
AuthorizationException
+            throws InvalidRequestException, TException, 
AuthenticationException, AuthorizationException, LoginException
     {
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(new 
TFramedTransport(socket));
+        logger.debug("Creating authenticated client for CF output format");
+        TTransport transport = 
ConfigHelper.getOutputTransportFactory(conf).openTransport(socket);
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-        socket.open();
         client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
         if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)
         {
@@ -163,6 +166,7 @@ public class ColumnFamilyOutputFormat extends 
OutputFormat<ByteBuffer,List<Mutat
             AuthenticationRequest authRequest = new 
AuthenticationRequest(creds);
             client.login(authRequest);
         }
+        logger.debug("Authenticated client for CF output format created 
successfully");
         return client;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index d35f142..fc90e5c 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.*;
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +49,6 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, 
SortedMap<ByteBuffer, IColumn>>
@@ -160,9 +160,9 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             // create connection using thrift
             String location = getLocation();
             socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf));
-            TBinaryProtocol binaryProtocol = new TBinaryProtocol(new 
TFramedTransport(socket));
+            TTransport transport = 
ConfigHelper.getInputTransportFactory(conf).openTransport(socket);
+            TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
             client = new Cassandra.Client(binaryProtocol);
-            socket.open();
 
             // log in
             client.set_keyspace(keyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 87dd5e0..1646635 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -40,11 +40,12 @@ import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
+import javax.security.auth.login.LoginException;
+
 
 public class ConfigHelper
 {
@@ -73,6 +74,8 @@ public class ConfigHelper
     private static final String WRITE_CONSISTENCY_LEVEL = 
"cassandra.consistencylevel.write";
     private static final String OUTPUT_COMPRESSION_CLASS = 
"cassandra.output.compression.class";
     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = 
"cassandra.output.compression.length";
+    private static final String INPUT_TRANSPORT_FACTORY_CLASS = 
"cassandra.input.transport.factory.class";
+    private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = 
"cassandra.output.transport.factory.class";
 
     private static final Logger logger = 
LoggerFactory.getLogger(ConfigHelper.class);
 
@@ -462,7 +465,7 @@ public class ConfigHelper
         return getClientFromAddressList(conf, 
ConfigHelper.getInputInitialAddress(conf).split(","), 
ConfigHelper.getInputRpcPort(conf));
     }
 
-        public static Cassandra.Client 
getClientFromOutputAddressList(Configuration conf) throws IOException
+    public static Cassandra.Client 
getClientFromOutputAddressList(Configuration conf) throws IOException
     {
         return getClientFromAddressList(conf, 
ConfigHelper.getOutputInitialAddress(conf).split(","), 
ConfigHelper.getOutputRpcPort(conf));
     }
@@ -475,7 +478,7 @@ public class ConfigHelper
         {
             try
             {
-                client = createConnection(address, port, true);
+                client = createConnection(conf, address, port);
                 break;
             }
             catch (IOException ioe)
@@ -495,19 +498,53 @@ public class ConfigHelper
         return client;
     }
 
-    public static Cassandra.Client createConnection(String host, Integer port, 
boolean framed)
+    public static Cassandra.Client createConnection(Configuration conf, String 
host, Integer port)
             throws IOException
     {
-        TSocket socket = new TSocket(host, port);
-        TTransport trans = framed ? new TFramedTransport(socket) : socket;
         try
         {
-            trans.open();
+            TSocket socket = new TSocket(host, port);
+            TTransport transport = 
getInputTransportFactory(conf).openTransport(socket);
+            return new Cassandra.Client(new TBinaryProtocol(transport));
+        }
+        catch (LoginException e)
+        {
+            throw new IOException("Unable to login to server " + host + ":" + 
port, e);
         }
         catch (TTransportException e)
         {
-            throw new IOException("unable to connect to server", e);
+            throw new IOException("Unable to connect to server " + host + ":" 
+ port, e);
+        }
+    }
+
+    public static ITransportFactory getInputTransportFactory(Configuration 
conf)
+    {
+        return getTransportFactory(conf.get(INPUT_TRANSPORT_FACTORY_CLASS, 
TFramedTransportFactory.class.getName()));
+    }
+
+    public static void setInputTransportFactoryClass(Configuration conf, 
String classname)
+    {
+        conf.set(INPUT_TRANSPORT_FACTORY_CLASS, classname);
+    }
+
+    public static ITransportFactory getOutputTransportFactory(Configuration 
conf)
+    {
+        return getTransportFactory(conf.get(OUTPUT_TRANSPORT_FACTORY_CLASS, 
TFramedTransportFactory.class.getName()));
+    }
+
+    public static void setOutputTransportFactoryClass(Configuration conf, 
String classname)
+    {
+        conf.set(OUTPUT_TRANSPORT_FACTORY_CLASS, classname);
+    }
+
+    private static ITransportFactory getTransportFactory(String 
factoryClassName) {
+        try
+        {
+            return (ITransportFactory) 
Class.forName(factoryClassName).newInstance();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Failed to instantiate transport 
factory:" + factoryClassName, e);
         }
-        return new Cassandra.Client(new TBinaryProtocol(trans));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/thrift/ITransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java 
b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
new file mode 100644
index 0000000..47cd034
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
@@ -0,0 +1,36 @@
+package org.apache.cassandra.thrift;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+
+
+public interface ITransportFactory
+{
+    TTransport openTransport(TSocket socket) throws LoginException, 
TTransportException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java 
b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
new file mode 100644
index 0000000..09ae99e
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -0,0 +1,37 @@
+package org.apache.cassandra.thrift;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class TFramedTransportFactory implements ITransportFactory
+{
+    public TTransport openTransport(TSocket socket) throws TTransportException
+    {
+        TTransport transport = new TFramedTransport(socket);
+        transport.open();
+        return transport;
+    }
+}

Reply via email to