Configurable transport for CFRR/CFRW. Patch by Piotr KoÅaczkowski, reviewed by brandonwilliams for CASSANDRA-4558
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7db46ef8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7db46ef8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7db46ef8 Branch: refs/heads/trunk Commit: 7db46ef80acc567a6ac3e4edcfcf39a6b22b73fa Parents: f5619bb Author: Brandon Williams <[email protected]> Authored: Mon Aug 20 15:27:53 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Mon Aug 20 15:27:53 2012 -0500 ---------------------------------------------------------------------- .../cassandra/hadoop/ColumnFamilyInputFormat.java | 2 +- .../cassandra/hadoop/ColumnFamilyOutputFormat.java | 12 ++- .../cassandra/hadoop/ColumnFamilyRecordReader.java | 6 +- .../org/apache/cassandra/hadoop/ConfigHelper.java | 55 ++++++++++++--- .../apache/cassandra/thrift/ITransportFactory.java | 36 ++++++++++ .../cassandra/thrift/TFramedTransportFactory.java | 37 ++++++++++ 6 files changed, 131 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index 354903d..cb79b01 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -252,7 +252,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B try { - Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getInputRpcPort(conf), true); + Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf)); client.set_keyspace(keyspace); return client.describe_splits(cfName, range.start_token, range.end_token, splitsize); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java index 668c4aa..e01ada5 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,9 +36,10 @@ import org.apache.cassandra.thrift.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; import org.apache.thrift.TException; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; +import javax.security.auth.login.LoginException; + /** * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific * OutputFormat that allows reduce tasks to store keys (and corresponding @@ -149,11 +151,12 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat * @throws AuthorizationException */ public static Cassandra.Client createAuthenticatedClient(TSocket socket, Configuration conf) - throws InvalidRequestException, TException, AuthenticationException, AuthorizationException + throws InvalidRequestException, TException, AuthenticationException, AuthorizationException, LoginException { - TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket)); + logger.debug("Creating authenticated client for CF output format"); + TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket); + TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport); Cassandra.Client client = new Cassandra.Client(binaryProtocol); - socket.open(); client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); if (ConfigHelper.getOutputKeyspaceUserName(conf) != null) { @@ -163,6 +166,7 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat AuthenticationRequest authRequest = new AuthenticationRequest(creds); client.login(authRequest); } + logger.debug("Authenticated client for CF output format created successfully"); return client; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index d35f142..fc90e5c 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.*; +import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,6 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> @@ -160,9 +160,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap // create connection using thrift String location = getLocation(); socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf)); - TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket)); + TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket); + TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport); client = new Cassandra.Client(binaryProtocol); - socket.open(); // log in client.set_keyspace(keyspace); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 87dd5e0..1646635 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -40,11 +40,12 @@ import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import javax.security.auth.login.LoginException; + public class ConfigHelper { @@ -73,6 +74,8 @@ public class ConfigHelper private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class"; private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length"; + private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class"; + private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class"; private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class); @@ -462,7 +465,7 @@ public class ConfigHelper return getClientFromAddressList(conf, ConfigHelper.getInputInitialAddress(conf).split(","), ConfigHelper.getInputRpcPort(conf)); } - public static Cassandra.Client getClientFromOutputAddressList(Configuration conf) throws IOException + public static Cassandra.Client getClientFromOutputAddressList(Configuration conf) throws IOException { return getClientFromAddressList(conf, ConfigHelper.getOutputInitialAddress(conf).split(","), ConfigHelper.getOutputRpcPort(conf)); } @@ -475,7 +478,7 @@ public class ConfigHelper { try { - client = createConnection(address, port, true); + client = createConnection(conf, address, port); break; } catch (IOException ioe) @@ -495,19 +498,53 @@ public class ConfigHelper return client; } - public static Cassandra.Client createConnection(String host, Integer port, boolean framed) + public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) throws IOException { - TSocket socket = new TSocket(host, port); - TTransport trans = framed ? new TFramedTransport(socket) : socket; try { - trans.open(); + TSocket socket = new TSocket(host, port); + TTransport transport = getInputTransportFactory(conf).openTransport(socket); + return new Cassandra.Client(new TBinaryProtocol(transport)); + } + catch (LoginException e) + { + throw new IOException("Unable to login to server " + host + ":" + port, e); } catch (TTransportException e) { - throw new IOException("unable to connect to server", e); + throw new IOException("Unable to connect to server " + host + ":" + port, e); + } + } + + public static ITransportFactory getInputTransportFactory(Configuration conf) + { + return getTransportFactory(conf.get(INPUT_TRANSPORT_FACTORY_CLASS, TFramedTransportFactory.class.getName())); + } + + public static void setInputTransportFactoryClass(Configuration conf, String classname) + { + conf.set(INPUT_TRANSPORT_FACTORY_CLASS, classname); + } + + public static ITransportFactory getOutputTransportFactory(Configuration conf) + { + return getTransportFactory(conf.get(OUTPUT_TRANSPORT_FACTORY_CLASS, TFramedTransportFactory.class.getName())); + } + + public static void setOutputTransportFactoryClass(Configuration conf, String classname) + { + conf.set(OUTPUT_TRANSPORT_FACTORY_CLASS, classname); + } + + private static ITransportFactory getTransportFactory(String factoryClassName) { + try + { + return (ITransportFactory) Class.forName(factoryClassName).newInstance(); + } + catch (Exception e) + { + throw new RuntimeException("Failed to instantiate transport factory:" + factoryClassName, e); } - return new Cassandra.Client(new TBinaryProtocol(trans)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/thrift/ITransportFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java b/src/java/org/apache/cassandra/thrift/ITransportFactory.java new file mode 100644 index 0000000..47cd034 --- /dev/null +++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java @@ -0,0 +1,36 @@ +package org.apache.cassandra.thrift; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import javax.security.auth.login.LoginException; +import java.io.IOException; + + +public interface ITransportFactory +{ + TTransport openTransport(TSocket socket) throws LoginException, TTransportException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java new file mode 100644 index 0000000..09ae99e --- /dev/null +++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java @@ -0,0 +1,37 @@ +package org.apache.cassandra.thrift; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class TFramedTransportFactory implements ITransportFactory +{ + public TTransport openTransport(TSocket socket) throws TTransportException + { + TTransport transport = new TFramedTransport(socket); + transport.open(); + return transport; + } +}
