Updated Branches: refs/heads/trunk 710609baf -> b90462aef
Separate input and output connection details in ConfigHelper. Patch by Mck SembWever, reviewed by brandonwilliams for CASSANDRA-3197 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b90462ae Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b90462ae Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b90462ae Branch: refs/heads/trunk Commit: b90462aefbab4e443a4a4d83da7a30cd4516697f Parents: 710609b Author: Brandon Williams <[email protected]> Authored: Mon Jan 9 10:23:53 2012 -0600 Committer: Brandon Williams <[email protected]> Committed: Mon Jan 9 10:23:53 2012 -0600 ---------------------------------------------------------------------- contrib/pig/README.txt | 10 ++ .../cassandra/hadoop/pig/CassandraStorage.java | 67 +++++++++--- .../org/apache/cassandra/client/RingCache.java | 6 +- .../cassandra/hadoop/ColumnFamilyInputFormat.java | 6 +- .../cassandra/hadoop/ColumnFamilyRecordReader.java | 2 +- .../cassandra/hadoop/ColumnFamilyRecordWriter.java | 2 +- .../org/apache/cassandra/hadoop/ConfigHelper.java | 86 +++++++++++--- .../org/apache/cassandra/client/TestRingCache.java | 6 +- 8 files changed, 139 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/contrib/pig/README.txt ---------------------------------------------------------------------- diff --git a/contrib/pig/README.txt b/contrib/pig/README.txt index 93eceb2..604030e 100644 --- a/contrib/pig/README.txt +++ b/contrib/pig/README.txt @@ -27,6 +27,16 @@ export PIG_INITIAL_ADDRESS=localhost export PIG_RPC_PORT=9160 export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner +These properties can be overridden with the following if you use different clusters +for input and output: +* PIG_INPUT_INITIAL_ADDRESS : initial address to connect to for reading +* PIG_INPUT_RPC_PORT : the port thrift is listening on for reading +* PIG_INPUT_PARTITIONER : cluster partitioner for reading +* PIG_OUTPUT_INITIAL_ADDRESS : initial address to connect to for writing +* PIG_OUTPUT_RPC_PORT : the port thrift is listening on for writing +* PIG_OUTPUT_PARTITIONER : cluster partitioner for writing + + Then you can build and run it like this: contrib/pig$ ant http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 2b41abf..9a84646 100644 --- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; -import org.apache.cassandra.db.SuperColumn; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.thrift.Mutation; @@ -49,15 +48,10 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.UDFContext; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; /** * A LoadStoreFunc for retrieving data from and storing data to Cassandra @@ -68,6 +62,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { // system environment variables that can be set to configure connection info: // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper + public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT"; + public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS"; + public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER"; + public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT"; + public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS"; + public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER"; public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; @@ -288,17 +288,36 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private void setConnectionInformation() throws IOException { if (System.getenv(PIG_RPC_PORT) != null) - ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT)); - else if (ConfigHelper.getRpcPort(conf) == 0) - throw new IOException("PIG_RPC_PORT environment variable not set"); + { + ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT)); + ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT)); + } + + if (System.getenv(PIG_INPUT_RPC_PORT) != null) + ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT)); + if (System.getenv(PIG_OUTPUT_RPC_PORT) != null) + ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT)); + if (System.getenv(PIG_INITIAL_ADDRESS) != null) - ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); - else if (ConfigHelper.getInitialAddress(conf) == null) - throw new IOException("PIG_INITIAL_ADDRESS environment variable not set"); + { + ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + } + if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null) + ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS)); + if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null) + ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS)); + if (System.getenv(PIG_PARTITIONER) != null) - ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER)); - else if (ConfigHelper.getPartitioner(conf) == null) - throw new IOException("PIG_PARTITIONER environment variable not set"); + { + ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER)); + ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER)); + } + if(System.getenv(PIG_INPUT_PARTITIONER) != null) + ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER)); + if(System.getenv(PIG_OUTPUT_PARTITIONER) != null) + ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER)); + } @Override @@ -314,6 +333,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); + + if (ConfigHelper.getInputRpcPort(conf) == 0) + throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set"); + if (ConfigHelper.getInputInitialAddress(conf) == null) + throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); + if (ConfigHelper.getInputPartitioner(conf) == null) + throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); + initSchema(loadSignature); } @@ -448,6 +475,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo setLocationFromUri(location); ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); + + if (ConfigHelper.getOutputRpcPort(conf) == 0) + throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set"); + if (ConfigHelper.getOutputInitialAddress(conf) == null) + throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); + if (ConfigHelper.getOutputPartitioner(conf) == null) + throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); + initSchema(storeSignature); } @@ -565,7 +600,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo Cassandra.Client client = null; try { - client = ConfigHelper.getClientFromAddressList(conf); + client = ConfigHelper.getClientFromInputAddressList(conf); CfDef cfDef = null; client.set_keyspace(keyspace); KsDef ksDef = client.describe_keyspace(keyspace); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/client/RingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/client/RingCache.java b/src/java/org/apache/cassandra/client/RingCache.java index 8d6648c..9b28a83 100644 --- a/src/java/org/apache/cassandra/client/RingCache.java +++ b/src/java/org/apache/cassandra/client/RingCache.java @@ -55,7 +55,7 @@ public class RingCache public RingCache(Configuration conf) throws IOException { this.conf = conf; - this.partitioner = ConfigHelper.getPartitioner(conf); + this.partitioner = ConfigHelper.getOutputPartitioner(conf); refreshEndpointMap(); } @@ -63,7 +63,7 @@ public class RingCache { try { - Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf); + Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf); List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf)); rangeMap = ArrayListMultimap.create(); @@ -96,7 +96,7 @@ public class RingCache } catch (TException e) { - logger_.debug("Error contacting seed list" + ConfigHelper.getInitialAddress(conf) + " " + e.getMessage()); + logger_.debug("Error contacting seed list" + ConfigHelper.getOutputInitialAddress(conf) + " " + e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index a23e999..c13e881 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -126,7 +126,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B Range<Token> jobRange = null; if (jobKeyRange != null) { - partitioner = ConfigHelper.getPartitioner(context.getConfiguration()); + partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner"; assert jobKeyRange.start_key == null : "only start_token supported"; assert jobKeyRange.end_key == null : "only end_token supported"; @@ -239,7 +239,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B try { - Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getRpcPort(conf), true); + Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getInputRpcPort(conf), true); client.set_keyspace(keyspace); return client.describe_splits(cfName, range.start_token, range.end_token, splitsize); } @@ -262,7 +262,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B private List<TokenRange> getRangeMap(Configuration conf) throws IOException { - Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf); + Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf); List<TokenRange> map; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index b84eb85..e3d1bb0 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -149,7 +149,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap // create connection using thrift String location = getLocation(); - socket = new TSocket(location, ConfigHelper.getRpcPort(conf)); + socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf)); TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket)); client = new Cassandra.Client(binaryProtocol); socket.open(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java index 328a0f7..5dc7655 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java @@ -314,7 +314,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> try { InetAddress address = iter.next(); - thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getRpcPort(conf)); + thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getOutputRpcPort(conf)); thriftClient = ColumnFamilyOutputFormat.createAuthenticatedClient(thriftSocket, conf); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index d8a1ab9..47407c0 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -45,7 +45,8 @@ import org.slf4j.LoggerFactory; public class ConfigHelper { - private static final String PARTITIONER_CONFIG = "cassandra.partitioner.class"; + private static final String INPUT_PARTITIONER_CONFIG = "cassandra.input.partitioner.class"; + private static final String OUTPUT_PARTITIONER_CONFIG = "cassandra.output.partitioner.class"; private static final String INPUT_KEYSPACE_CONFIG = "cassandra.input.keyspace"; private static final String OUTPUT_KEYSPACE_CONFIG = "cassandra.output.keyspace"; private static final String INPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.input.keyspace.username"; @@ -56,13 +57,14 @@ public class ConfigHelper private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily"; private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate"; private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange"; - private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate"; private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size"; private static final int DEFAULT_SPLIT_SIZE = 64 * 1024; private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size"; private static final int DEFAULT_RANGE_BATCH_SIZE = 4096; - private static final String THRIFT_PORT = "cassandra.thrift.port"; - private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address"; + private static final String INPUT_THRIFT_PORT = "cassandra.input.thrift.port"; + private static final String OUTPUT_THRIFT_PORT = "cassandra.output.thrift.port"; + private static final String INPUT_INITIAL_THRIFT_ADDRESS = "cassandra.input.thrift.address"; + private static final String OUTPUT_INITIAL_THRIFT_ADDRESS = "cassandra.output.thrift.address"; private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read"; private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; @@ -309,36 +311,36 @@ public class ConfigHelper return conf.get(WRITE_CONSISTENCY_LEVEL, "ONE"); } - public static int getRpcPort(Configuration conf) + public static int getInputRpcPort(Configuration conf) { - return Integer.parseInt(conf.get(THRIFT_PORT)); + return Integer.parseInt(conf.get(INPUT_THRIFT_PORT)); } - public static void setRpcPort(Configuration conf, String port) + public static void setInputRpcPort(Configuration conf, String port) { - conf.set(THRIFT_PORT, port); + conf.set(INPUT_THRIFT_PORT, port); } - public static String getInitialAddress(Configuration conf) + public static String getInputInitialAddress(Configuration conf) { - return conf.get(INITIAL_THRIFT_ADDRESS); + return conf.get(INPUT_INITIAL_THRIFT_ADDRESS); } - public static void setInitialAddress(Configuration conf, String address) + public static void setInputInitialAddress(Configuration conf, String address) { - conf.set(INITIAL_THRIFT_ADDRESS, address); + conf.set(INPUT_INITIAL_THRIFT_ADDRESS, address); } - public static void setPartitioner(Configuration conf, String classname) + public static void setInputPartitioner(Configuration conf, String classname) { - conf.set(PARTITIONER_CONFIG, classname); + conf.set(INPUT_PARTITIONER_CONFIG, classname); } - public static IPartitioner getPartitioner(Configuration conf) + public static IPartitioner getInputPartitioner(Configuration conf) { try { - return FBUtilities.newPartitioner(conf.get(PARTITIONER_CONFIG)); + return FBUtilities.newPartitioner(conf.get(INPUT_PARTITIONER_CONFIG)); } catch (ConfigurationException e) { @@ -346,17 +348,63 @@ public class ConfigHelper } } + public static int getOutputRpcPort(Configuration conf) + { + return Integer.parseInt(conf.get(OUTPUT_THRIFT_PORT)); + } + + public static void setOutputRpcPort(Configuration conf, String port) + { + conf.set(OUTPUT_THRIFT_PORT, port); + } + + public static String getOutputInitialAddress(Configuration conf) + { + return conf.get(OUTPUT_INITIAL_THRIFT_ADDRESS); + } + + public static void setOutputInitialAddress(Configuration conf, String address) + { + conf.set(OUTPUT_INITIAL_THRIFT_ADDRESS, address); + } + + public static void setOutputPartitioner(Configuration conf, String classname) + { + conf.set(OUTPUT_PARTITIONER_CONFIG, classname); + } + + public static IPartitioner getOutputPartitioner(Configuration conf) + { + try + { + return FBUtilities.newPartitioner(conf.get(OUTPUT_PARTITIONER_CONFIG)); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } + } + + + public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws IOException + { + return getClientFromAddressList(conf, ConfigHelper.getInputInitialAddress(conf).split(","), ConfigHelper.getInputRpcPort(conf)); + } + + public static Cassandra.Client getClientFromOutputAddressList(Configuration conf) throws IOException + { + return getClientFromAddressList(conf, ConfigHelper.getOutputInitialAddress(conf).split(","), ConfigHelper.getOutputRpcPort(conf)); + } - public static Cassandra.Client getClientFromAddressList(Configuration conf) throws IOException + private static Cassandra.Client getClientFromAddressList(Configuration conf, String[] addresses, int port) throws IOException { - String[] addresses = ConfigHelper.getInitialAddress(conf).split(","); Cassandra.Client client = null; List<IOException> exceptions = new ArrayList<IOException>(); for (String address : addresses) { try { - client = createConnection(address, ConfigHelper.getRpcPort(conf), true); + client = createConnection(address, port, true); break; } catch (IOException ioe) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/test/unit/org/apache/cassandra/client/TestRingCache.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/client/TestRingCache.java b/test/unit/org/apache/cassandra/client/TestRingCache.java index 58c4d26..4fae42b 100644 --- a/test/unit/org/apache/cassandra/client/TestRingCache.java +++ b/test/unit/org/apache/cassandra/client/TestRingCache.java @@ -63,9 +63,9 @@ public class TestRingCache thriftClient = cassandraClient; String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(); conf = new Configuration(); - ConfigHelper.setPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName()); - ConfigHelper.setInitialAddress(conf, seed); - ConfigHelper.setRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort())); + ConfigHelper.setOutputPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName()); + ConfigHelper.setOutputInitialAddress(conf, seed); + ConfigHelper.setOutputRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort())); }
