Pluggable Thrift transport factories for CLI patch by Jason Brown and Pavel Yaskevich; reviewed by Pavel Yaskevich for CASSANDRA-4609
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59a6a5d8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59a6a5d8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59a6a5d8 Branch: refs/heads/trunk Commit: 59a6a5d82dc088dac8f1d98bc48a6426d75dc1a2 Parents: d170a7a Author: Pavel Yaskevich <[email protected]> Authored: Tue Oct 2 18:13:01 2012 -0700 Committer: Pavel Yaskevich <[email protected]> Committed: Wed Oct 3 22:16:56 2012 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/cli/CliMain.java | 14 +----- src/java/org/apache/cassandra/cli/CliOptions.java | 28 +++++++++++++ .../org/apache/cassandra/cli/CliSessionState.java | 8 +++- .../cli/transport/FramedTransportFactory.java | 30 ++++++++++++++ .../cli/transport/SimpleTransportFactory.java | 32 +++++++++++++++ 6 files changed, 100 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59a6a5d8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 292db49..d803c0f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,7 @@ * add authentication support to sstableloader (CASSANDRA-4712) * Fix CQL3 'is reversed' logic (CASSANDRA-4716) * (CQL3) Don't return ReversedType in result set metadata (CASSANDRA-4717) + * Pluggable Thrift transport factories for CLI (CASSANDRA-4609) Merged from 1.0: * Switch from NBHM to CHM in MessagingService's callback map, which prevents OOM in long-running instances (CASSANDRA-4708) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59a6a5d8/src/java/org/apache/cassandra/cli/CliMain.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliMain.java b/src/java/org/apache/cassandra/cli/CliMain.java index 2d900ba..9baf676 100644 --- a/src/java/org/apache/cassandra/cli/CliMain.java +++ b/src/java/org/apache/cassandra/cli/CliMain.java @@ -31,7 +31,6 @@ import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.thrift.*; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; @@ -62,21 +61,14 @@ public class CliMain if (transport != null) transport.close(); - if (sessionState.framed) - { - transport = new TFramedTransport(socket); - } - else - { - transport = socket; - } - + transport = sessionState.transportFactory.getTransport(socket); TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol); try { - transport.open(); + if (!transport.isOpen()) + transport.open(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59a6a5d8/src/java/org/apache/cassandra/cli/CliOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliOptions.java b/src/java/org/apache/cassandra/cli/CliOptions.java index c582728..982daf5 100644 --- a/src/java/org/apache/cassandra/cli/CliOptions.java +++ b/src/java/org/apache/cassandra/cli/CliOptions.java @@ -17,7 +17,9 @@ */ package org.apache.cassandra.cli; +import org.apache.cassandra.cli.transport.SimpleTransportFactory; import org.apache.commons.cli.*; +import org.apache.thrift.transport.TTransportFactory; /** * @@ -35,6 +37,7 @@ public class CliOptions private static final String HOST_OPTION = "host"; private static final String PORT_OPTION = "port"; private static final String UNFRAME_OPTION = "unframed"; + private static final String TRANSPORT_FACTORY = "transport-factory"; private static final String DEBUG_OPTION = "debug"; private static final String USERNAME_OPTION = "username"; private static final String PASSWORD_OPTION = "password"; @@ -64,6 +67,7 @@ public class CliOptions options.addOption("f", FILE_OPTION, "FILENAME", "load statements from the specific file"); options.addOption(null, JMX_PORT_OPTION, "JMX-PORT", "JMX service port"); options.addOption(null, SCHEMA_MIGRATION_WAIT_TIME, "TIME", "Schema migration wait time (secs.), default is 10 secs"); + options.addOption("tf", TRANSPORT_FACTORY, "TRANSPORT-FACTORY", "Fully-qualified TTransportFactory class name for creating a connection to cassandra"); // options without argument options.addOption("B", BATCH_OPTION, "enabled batch mode (suppress output; errors are fatal)"); @@ -98,9 +102,16 @@ public class CliOptions // Look to see if frame has been specified if (cmd.hasOption(UNFRAME_OPTION)) { + if (cmd.hasOption(TRANSPORT_FACTORY)) + throw new IllegalArgumentException("--unframed and --transport-factory options should not be fixed."); + css.framed = false; + css.transportFactory = new SimpleTransportFactory(); } + if (cmd.hasOption(TRANSPORT_FACTORY)) + css.transportFactory = validateAndSetTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY)); + // Look to see if frame has been specified if (cmd.hasOption(DEBUG_OPTION)) { @@ -222,4 +233,21 @@ public class CliOptions } } + private static TTransportFactory validateAndSetTransportFactory(String transportFactory) + { + try + { + Class factory = Class.forName(transportFactory); + + if(!TTransportFactory.class.isAssignableFrom(factory)) + throw new IllegalArgumentException(String.format("transport factory '%s' " + + "not derived from TTransportFactory", transportFactory)); + + return (TTransportFactory) factory.newInstance(); + } + catch (Exception e) + { + throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59a6a5d8/src/java/org/apache/cassandra/cli/CliSessionState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliSessionState.java b/src/java/org/apache/cassandra/cli/CliSessionState.java index 9f956fd..0284239 100644 --- a/src/java/org/apache/cassandra/cli/CliSessionState.java +++ b/src/java/org/apache/cassandra/cli/CliSessionState.java @@ -18,11 +18,13 @@ package org.apache.cassandra.cli; -import org.apache.cassandra.tools.NodeProbe; - import java.io.InputStream; import java.io.PrintStream; +import org.apache.cassandra.cli.transport.FramedTransportFactory; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.thrift.transport.TTransportFactory; + /** * Used to hold the state for the CLI. */ @@ -41,6 +43,8 @@ public class CliSessionState public int jmxPort = 7199;// JMX service port public boolean verbose = false; // verbose output public int schema_mwt = 10 * 1000; // Schema migration wait time (secs.) + public TTransportFactory transportFactory = new FramedTransportFactory(); + /* * Streams to read/write from */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59a6a5d8/src/java/org/apache/cassandra/cli/transport/FramedTransportFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/transport/FramedTransportFactory.java b/src/java/org/apache/cassandra/cli/transport/FramedTransportFactory.java new file mode 100644 index 0000000..ffe7b00 --- /dev/null +++ b/src/java/org/apache/cassandra/cli/transport/FramedTransportFactory.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cli.transport; + +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportFactory; + +public class FramedTransportFactory extends TTransportFactory +{ + public TTransport getTransport(TTransport base) + { + return new TFramedTransport(base); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59a6a5d8/src/java/org/apache/cassandra/cli/transport/SimpleTransportFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/transport/SimpleTransportFactory.java b/src/java/org/apache/cassandra/cli/transport/SimpleTransportFactory.java new file mode 100644 index 0000000..8b46646 --- /dev/null +++ b/src/java/org/apache/cassandra/cli/transport/SimpleTransportFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cli.transport; + +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportFactory; + +/** + * Almost a noop factory, getTransport() method returns back the argument + */ +public class SimpleTransportFactory extends TTransportFactory +{ + public TTransport getTransport(TTransport transport) + { + return transport; + } +}
