Updated Branches: refs/heads/cassandra-2.0 cef6552c2 -> 2336d94ef
Remove Hadoop dependency from ITransportFactory patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-6062 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2336d94e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2336d94e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2336d94e Branch: refs/heads/cassandra-2.0 Commit: 2336d94ef3a286bc9b7331086085eca56f915e76 Parents: cef6552 Author: Aleksey Yeschenko <[email protected]> Authored: Thu Sep 19 21:05:15 2013 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Thu Sep 19 21:05:15 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/cli/CliMain.java | 28 +++++------------ .../org/apache/cassandra/cli/CliOptions.java | 15 +++++---- .../apache/cassandra/cli/CliSessionState.java | 6 ++-- .../hadoop/AbstractColumnFamilyInputFormat.java | 11 ++++--- .../AbstractColumnFamilyOutputFormat.java | 2 +- .../apache/cassandra/hadoop/ConfigHelper.java | 33 +++++++------------- .../cassandra/thrift/ITransportFactory.java | 14 +++------ .../thrift/TFramedTransportFactory.java | 17 +++++----- 9 files changed, 50 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ae6eedc..a25d03e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 2.0.2 * Make ThriftServer more easlly extensible (CASSANDRA-6058) + * Remove Hadoop dependency from ITransportFactory (CASSANDRA-6062) Merged from 1.2: * Allow cache-keys-to-save to be set at runtime (CASSANDRA-5980) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliMain.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliMain.java b/src/java/org/apache/cassandra/cli/CliMain.java index 144bd2e..77a9020 100644 --- a/src/java/org/apache/cassandra/cli/CliMain.java +++ b/src/java/org/apache/cassandra/cli/CliMain.java @@ -24,16 +24,15 @@ import java.io.IOException; import java.nio.charset.CharacterCodingException; import java.util.*; -import jline.ConsoleReader; -import jline.History; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import jline.ConsoleReader; +import jline.History; /** * Cassandra Command Line Interface (CLI) Main @@ -58,19 +57,12 @@ public class CliMain */ public static void connect(String server, int port) { - TSocket socket = new TSocket(server, port); - if (transport != null) transport.close(); - transport = sessionState.transportFactory.getTransport(socket); - TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); - Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol); - try { - if (!transport.isOpen()) - transport.open(); + transport = sessionState.transportFactory.openTransport(server, port); } catch (Exception e) { @@ -80,7 +72,8 @@ public class CliMain throw new RuntimeException("Exception connecting to " + server + "/" + port + ". Reason: " + error + "."); } - thriftClient = cassandraClient; + TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); + thriftClient = new Cassandra.Client(binaryProtocol); cliClient = new CliClient(sessionState, thriftClient); if ((sessionState.username != null) && (sessionState.password != null)) @@ -125,12 +118,7 @@ public class CliMain cliClient.setKeySpace(sessionState.keyspace); updateCompletor(CliUtils.getCfNamesByKeySpace(cliClient.getKSMetaData(sessionState.keyspace))); } - catch (InvalidRequestException e) - { - sessionState.err.println("Keyspace " + sessionState.keyspace + " not found"); - return; - } - catch (NotFoundException e) + catch (InvalidRequestException | NotFoundException e) { sessionState.err.println("Keyspace " + sessionState.keyspace + " not found"); return; @@ -201,7 +189,7 @@ public class CliMain completer.setCandidateStrings(strs); } - public static void processStatement(String query) throws CharacterCodingException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException + public static void processStatement(String query) throws CharacterCodingException, TException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException { cliClient.executeCLIStatement(query); } @@ -361,7 +349,7 @@ public class CliMain private static void evaluateFileStatements(BufferedReader reader) throws IOException { - String line = ""; + String line; String currentStatement = ""; boolean commentedBlock = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliOptions.java b/src/java/org/apache/cassandra/cli/CliOptions.java index 464d092..68f17c9 100644 --- a/src/java/org/apache/cassandra/cli/CliOptions.java +++ b/src/java/org/apache/cassandra/cli/CliOptions.java @@ -18,7 +18,8 @@ package org.apache.cassandra.cli; import org.apache.commons.cli.*; -import org.apache.thrift.transport.TTransportFactory; + +import org.apache.cassandra.thrift.ITransportFactory; /** * @@ -74,7 +75,7 @@ public class CliOptions options.addOption(null, JMX_PORT_OPTION, "JMX-PORT", "JMX service port"); options.addOption(null, JMX_USERNAME_OPTION, "JMX-USERNAME", "JMX service username"); options.addOption(null, JMX_PASSWORD_OPTION, "JMX-PASSWORD", "JMX service password"); - options.addOption("tf", TRANSPORT_FACTORY, "TRANSPORT-FACTORY", "Fully-qualified TTransportFactory class name for creating a connection to cassandra"); + options.addOption("tf", TRANSPORT_FACTORY, "TRANSPORT-FACTORY", "Fully-qualified ITransportFactory class name for creating a connection to cassandra"); // ssl connection-related options options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore"); @@ -265,17 +266,15 @@ public class CliOptions } } - private static TTransportFactory validateAndSetTransportFactory(String transportFactory) + private static ITransportFactory validateAndSetTransportFactory(String transportFactory) { try { Class<?> factory = Class.forName(transportFactory); - - if(!TTransportFactory.class.isAssignableFrom(factory)) + if (!ITransportFactory.class.isAssignableFrom(factory)) throw new IllegalArgumentException(String.format("transport factory '%s' " + - "not derived from TTransportFactory", transportFactory)); - - return (TTransportFactory) factory.newInstance(); + "not derived from ITransportFactory", transportFactory)); + return (ITransportFactory) factory.newInstance(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliSessionState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliSessionState.java b/src/java/org/apache/cassandra/cli/CliSessionState.java index 9e833c6..f0de713 100644 --- a/src/java/org/apache/cassandra/cli/CliSessionState.java +++ b/src/java/org/apache/cassandra/cli/CliSessionState.java @@ -20,11 +20,11 @@ package org.apache.cassandra.cli; import java.io.InputStream; import java.io.PrintStream; -import org.apache.cassandra.cli.transport.FramedTransportFactory; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; +import org.apache.cassandra.thrift.ITransportFactory; +import org.apache.cassandra.thrift.TFramedTransportFactory; import org.apache.cassandra.tools.NodeProbe; -import org.apache.thrift.transport.TTransportFactory; /** * Used to hold the state for the CLI. @@ -44,7 +44,7 @@ public class CliSessionState public String jmxUsername; // JMX service username public String jmxPassword; // JMX service password public boolean verbose = false; // verbose output - public TTransportFactory transportFactory = new FramedTransportFactory(); + public ITransportFactory transportFactory = new TFramedTransportFactory(); public EncryptionOptions encOptions = new ClientEncryptionOptions(); /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index 59c4bf7..d05b890 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@ -30,6 +30,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -53,11 +58,7 @@ import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y> { @@ -90,7 +91,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception { logger.debug("Creating authenticated client for CF input format"); - TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port, conf); + TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port); TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); Cassandra.Client client = new Cassandra.Client(binaryProtocol); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java index 5a03777..2040f61 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java @@ -120,7 +120,7 @@ public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputForma public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception { logger.debug("Creating authenticated client for CF output format"); - TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port, conf); + TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port); TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); Cassandra.Client client = new Cassandra.Client(binaryProtocol); client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index a109b2f..ebfd3c0 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -23,13 +23,12 @@ package org.apache.cassandra.hadoop; import java.io.IOException; import java.util.*; -import com.google.common.collect.Maps; -import org.apache.cassandra.io.compress.CompressionParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; @@ -41,7 +40,6 @@ import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TTransport; - public class ConfigHelper { private static final String INPUT_PARTITIONER_CONFIG = "cassandra.input.partitioner.class"; @@ -69,14 +67,10 @@ public class ConfigHelper private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class"; private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length"; - - private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class"; - private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class"; private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb"; private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class); - /** * Set the keyspace and column family for the input of this job. * @@ -88,13 +82,10 @@ public class ConfigHelper public static void setInputColumnFamily(Configuration conf, String keyspace, String columnFamily, boolean widerows) { if (keyspace == null) - { throw new UnsupportedOperationException("keyspace may not be null"); - } + if (columnFamily == null) - { throw new UnsupportedOperationException("columnfamily may not be null"); - } conf.set(INPUT_KEYSPACE_CONFIG, keyspace); conf.set(INPUT_COLUMNFAMILY_CONFIG, columnFamily); @@ -122,9 +113,7 @@ public class ConfigHelper public static void setOutputKeyspace(Configuration conf, String keyspace) { if (keyspace == null) - { throw new UnsupportedOperationException("keyspace may not be null"); - } conf.set(OUTPUT_KEYSPACE_CONFIG, keyspace); } @@ -562,12 +551,11 @@ public class ConfigHelper return client; } - public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) - throws IOException + public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) throws IOException { try { - TTransport transport = getClientTransportFactory(conf).openTransport(host, port, conf); + TTransport transport = getClientTransportFactory(conf).openTransport(host, port); return new Cassandra.Client(new TBinaryProtocol(transport, true, true)); } catch (Exception e) @@ -578,16 +566,15 @@ public class ConfigHelper public static ITransportFactory getClientTransportFactory(Configuration conf) { - String factoryClassName = conf.get( - ITransportFactory.PROPERTY_KEY, - TFramedTransportFactory.class.getName()); + String factoryClassName = conf.get(ITransportFactory.PROPERTY_KEY, TFramedTransportFactory.class.getName()); ITransportFactory factory = getClientTransportFactory(factoryClassName); Map<String, String> options = getOptions(conf, factory.supportedOptions()); factory.setOptions(options); return factory; } - private static ITransportFactory getClientTransportFactory(String factoryClassName) { + private static ITransportFactory getClientTransportFactory(String factoryClassName) + { try { return (ITransportFactory) Class.forName(factoryClassName).newInstance(); @@ -597,8 +584,10 @@ public class ConfigHelper throw new RuntimeException("Failed to instantiate transport factory:" + factoryClassName, e); } } - private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions) { - Map<String, String> options = Maps.newHashMap(); + + private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions) + { + Map<String, String> options = new HashMap<>(); for (String optionKey : supportedOptions) { String optionValue = conf.get(optionKey); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/thrift/ITransportFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java b/src/java/org/apache/cassandra/thrift/ITransportFactory.java index 98b1c03..7a65728 100644 --- a/src/java/org/apache/cassandra/thrift/ITransportFactory.java +++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java @@ -1,5 +1,3 @@ -package org.apache.cassandra.thrift; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,13 +18,12 @@ package org.apache.cassandra.thrift; * under the License. * */ - -import org.apache.hadoop.conf.Configuration; -import org.apache.thrift.transport.TTransport; +package org.apache.cassandra.thrift; import java.util.Map; import java.util.Set; +import org.apache.thrift.transport.TTransport; /** * Transport factory for establishing thrift connections from clients to a remote server. @@ -34,8 +31,6 @@ import java.util.Set; public interface ITransportFactory { static final String PROPERTY_KEY = "cassandra.client.transport.factory"; - static final String LONG_OPTION = "transport-factory"; - static final String SHORT_OPTION = "tr"; /** * Opens a client transport to a thrift server. @@ -48,16 +43,15 @@ public interface ITransportFactory * * @param host fully qualified hostname of the server * @param port RPC port of the server - * @param conf Hadoop configuration * @return open and ready to use transport * @throws Exception implementation defined; usually throws TTransportException or IOException * if the connection cannot be established */ - TTransport openTransport(String host, int port, Configuration conf) throws Exception; + TTransport openTransport(String host, int port) throws Exception; /** * Sets an implementation defined set of options. - * Keys in this map must conform to the set set returned by TClientTransportFactory#supportedOptions. + * Keys in this map must conform to the set set returned by ITransportFactory#supportedOptions. * @param options option map */ void setOptions(Map<String, String> options); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java index 7d2d89e..a4c6bb7 100644 --- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java +++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java @@ -1,5 +1,3 @@ -package org.apache.cassandra.thrift; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,35 +18,38 @@ package org.apache.cassandra.thrift; * under the License. * */ +package org.apache.cassandra.thrift; import java.util.Collections; import java.util.Map; import java.util.Set; -import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; -import org.apache.hadoop.conf.Configuration; - public class TFramedTransportFactory implements ITransportFactory { - public TTransport openTransport(String host, int port, Configuration conf) throws TTransportException + private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb"; + private int thriftFramedTransportSizeMb = 15; // 15Mb is the default for C* & Hadoop ConfigHelper + + public TTransport openTransport(String host, int port) throws TTransportException { TSocket socket = new TSocket(host, port); - TTransport transport = new TFramedTransport(socket, ConfigHelper.getThriftFramedTransportSize(conf)); + TTransport transport = new TFramedTransport(socket, thriftFramedTransportSizeMb * 1024 * 1024); transport.open(); return transport; } public void setOptions(Map<String, String> options) { + if (options.containsKey(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB)) + thriftFramedTransportSizeMb = Integer.parseInt(options.get(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB)); } public Set<String> supportedOptions() { - return Collections.emptySet(); + return Collections.singleton(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB); } }
