Updated Branches: refs/heads/trunk b90462aef -> bd00a2355
Revert "Separate input and output connection details in ConfigHelper." This reverts commit b90462aefbab4e443a4a4d83da7a30cd4516697f. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd00a235 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd00a235 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd00a235 Branch: refs/heads/trunk Commit: bd00a2355ac576afb57e5f3fe73093c729ed79ed Parents: b90462a Author: Brandon Williams <[email protected]> Authored: Mon Jan 9 11:45:39 2012 -0600 Committer: Brandon Williams <[email protected]> Committed: Mon Jan 9 11:45:39 2012 -0600 ---------------------------------------------------------------------- contrib/pig/README.txt | 10 -- .../cassandra/hadoop/pig/CassandraStorage.java | 67 +++--------- .../org/apache/cassandra/client/RingCache.java | 6 +- .../cassandra/hadoop/ColumnFamilyInputFormat.java | 6 +- .../cassandra/hadoop/ColumnFamilyRecordReader.java | 2 +- .../cassandra/hadoop/ColumnFamilyRecordWriter.java | 2 +- .../org/apache/cassandra/hadoop/ConfigHelper.java | 86 +++----------- .../org/apache/cassandra/client/TestRingCache.java | 6 +- 8 files changed, 46 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd00a235/contrib/pig/README.txt ---------------------------------------------------------------------- diff --git a/contrib/pig/README.txt b/contrib/pig/README.txt index 604030e..93eceb2 100644 --- a/contrib/pig/README.txt +++ b/contrib/pig/README.txt @@ -27,16 +27,6 @@ export PIG_INITIAL_ADDRESS=localhost export PIG_RPC_PORT=9160 export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner -These properties can be overridden with the following if you use different clusters -for input and output: -* PIG_INPUT_INITIAL_ADDRESS : initial address to connect to for reading -* PIG_INPUT_RPC_PORT : the port thrift is listening on for reading -* PIG_INPUT_PARTITIONER : cluster partitioner for reading -* PIG_OUTPUT_INITIAL_ADDRESS : initial address to connect to for writing -* PIG_OUTPUT_RPC_PORT : the port thrift is listening on for writing -* PIG_OUTPUT_PARTITIONER : cluster partitioner for writing - - Then you can build and run it like this: contrib/pig$ ant http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd00a235/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 9a84646..2b41abf 100644 --- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.SuperColumn; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.thrift.Mutation; @@ -48,10 +49,15 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.UDFContext; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; /** * A LoadStoreFunc for retrieving data from and storing data to Cassandra @@ -62,12 +68,6 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { // system environment variables that can be set to configure connection info: // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper - public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT"; - public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS"; - public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER"; - public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT"; - public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS"; - public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER"; public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; @@ -288,36 +288,17 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private void setConnectionInformation() throws IOException { if (System.getenv(PIG_RPC_PORT) != null) - { - ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT)); - ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT)); - } - - if (System.getenv(PIG_INPUT_RPC_PORT) != null) - ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT)); - if (System.getenv(PIG_OUTPUT_RPC_PORT) != null) - ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT)); - + ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT)); + else if (ConfigHelper.getRpcPort(conf) == 0) + throw new IOException("PIG_RPC_PORT environment variable not set"); if (System.getenv(PIG_INITIAL_ADDRESS) != null) - { - ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); - ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); - } - if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null) - ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS)); - if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null) - ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS)); - + ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + else if (ConfigHelper.getInitialAddress(conf) == null) + throw new IOException("PIG_INITIAL_ADDRESS environment variable not set"); if (System.getenv(PIG_PARTITIONER) != null) - { - ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER)); - ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER)); - } - if(System.getenv(PIG_INPUT_PARTITIONER) != null) - ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER)); - if(System.getenv(PIG_OUTPUT_PARTITIONER) != null) - ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER)); - + ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER)); + else if (ConfigHelper.getPartitioner(conf) == null) + throw new IOException("PIG_PARTITIONER environment variable not set"); } @Override @@ -333,14 +314,6 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); - - if (ConfigHelper.getInputRpcPort(conf) == 0) - throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set"); - if (ConfigHelper.getInputInitialAddress(conf) == null) - throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); - if (ConfigHelper.getInputPartitioner(conf) == null) - throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); - initSchema(loadSignature); } @@ -475,14 +448,6 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo setLocationFromUri(location); ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); - - if (ConfigHelper.getOutputRpcPort(conf) == 0) - throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set"); - if (ConfigHelper.getOutputInitialAddress(conf) == null) - throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); - if (ConfigHelper.getOutputPartitioner(conf) == null) - throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); - initSchema(storeSignature); } @@ -600,7 +565,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo Cassandra.Client client = null; try { - client = ConfigHelper.getClientFromInputAddressList(conf); + client = ConfigHelper.getClientFromAddressList(conf); CfDef cfDef = null; client.set_keyspace(keyspace); KsDef ksDef = client.describe_keyspace(keyspace); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd00a235/src/java/org/apache/cassandra/client/RingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/client/RingCache.java b/src/java/org/apache/cassandra/client/RingCache.java index 9b28a83..8d6648c 100644 --- a/src/java/org/apache/cassandra/client/RingCache.java +++ b/src/java/org/apache/cassandra/client/RingCache.java @@ -55,7 +55,7 @@ public class RingCache public RingCache(Configuration conf) throws IOException { this.conf = conf; - this.partitioner = ConfigHelper.getOutputPartitioner(conf); + this.partitioner = ConfigHelper.getPartitioner(conf); refreshEndpointMap(); } @@ -63,7 +63,7 @@ public class RingCache { try { - Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf); + Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf); List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf)); rangeMap = ArrayListMultimap.create(); @@ -96,7 +96,7 @@ public class RingCache } catch (TException e) { - logger_.debug("Error contacting seed list" + ConfigHelper.getOutputInitialAddress(conf) + " " + e.getMessage()); + logger_.debug("Error contacting seed list" + ConfigHelper.getInitialAddress(conf) + " " + e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd00a235/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index c13e881..a23e999 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -126,7 +126,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B Range<Token> jobRange = null; if (jobKeyRange != null) { - partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); + partitioner = ConfigHelper.getPartitioner(context.getConfiguration()); assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner"; assert jobKeyRange.start_key == null : "only start_token supported"; assert jobKeyRange.end_key == null : "only end_token supported"; @@ -239,7 +239,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B try { - Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getInputRpcPort(conf), true); + Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getRpcPort(conf), true); client.set_keyspace(keyspace); return client.describe_splits(cfName, range.start_token, range.end_token, splitsize); } @@ -262,7 +262,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B private List<TokenRange> getRangeMap(Configuration conf) throws IOException { - Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf); + Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf); List<TokenRange> map; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd00a235/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index e3d1bb0..b84eb85 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -149,7 +149,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap // create connection using thrift String location = getLocation(); - socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf)); + socket = new TSocket(location, ConfigHelper.getRpcPort(conf)); TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket)); client = new Cassandra.Client(binaryProtocol); socket.open(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd00a235/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java index 5dc7655..328a0f7 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java @@ -314,7 +314,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> try { InetAddress address = iter.next(); - thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getOutputRpcPort(conf)); + thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getRpcPort(conf)); thriftClient = ColumnFamilyOutputFormat.createAuthenticatedClient(thriftSocket, conf); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd00a235/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 47407c0..d8a1ab9 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -45,8 +45,7 @@ import org.slf4j.LoggerFactory; public class ConfigHelper { - private static final String INPUT_PARTITIONER_CONFIG = "cassandra.input.partitioner.class"; - private static final String OUTPUT_PARTITIONER_CONFIG = "cassandra.output.partitioner.class"; + private static final String PARTITIONER_CONFIG = "cassandra.partitioner.class"; private static final String INPUT_KEYSPACE_CONFIG = "cassandra.input.keyspace"; private static final String OUTPUT_KEYSPACE_CONFIG = "cassandra.output.keyspace"; private static final String INPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.input.keyspace.username"; @@ -57,14 +56,13 @@ public class ConfigHelper private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily"; private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate"; private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange"; + private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate"; private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size"; private static final int DEFAULT_SPLIT_SIZE = 64 * 1024; private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size"; private static final int DEFAULT_RANGE_BATCH_SIZE = 4096; - private static final String INPUT_THRIFT_PORT = "cassandra.input.thrift.port"; - private static final String OUTPUT_THRIFT_PORT = "cassandra.output.thrift.port"; - private static final String INPUT_INITIAL_THRIFT_ADDRESS = "cassandra.input.thrift.address"; - private static final String OUTPUT_INITIAL_THRIFT_ADDRESS = "cassandra.output.thrift.address"; + private static final String THRIFT_PORT = "cassandra.thrift.port"; + private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address"; private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read"; private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; @@ -311,36 +309,36 @@ public class ConfigHelper return conf.get(WRITE_CONSISTENCY_LEVEL, "ONE"); } - public static int getInputRpcPort(Configuration conf) + public static int getRpcPort(Configuration conf) { - return Integer.parseInt(conf.get(INPUT_THRIFT_PORT)); + return Integer.parseInt(conf.get(THRIFT_PORT)); } - public static void setInputRpcPort(Configuration conf, String port) + public static void setRpcPort(Configuration conf, String port) { - conf.set(INPUT_THRIFT_PORT, port); + conf.set(THRIFT_PORT, port); } - public static String getInputInitialAddress(Configuration conf) + public static String getInitialAddress(Configuration conf) { - return conf.get(INPUT_INITIAL_THRIFT_ADDRESS); + return conf.get(INITIAL_THRIFT_ADDRESS); } - public static void setInputInitialAddress(Configuration conf, String address) + public static void setInitialAddress(Configuration conf, String address) { - conf.set(INPUT_INITIAL_THRIFT_ADDRESS, address); + conf.set(INITIAL_THRIFT_ADDRESS, address); } - public static void setInputPartitioner(Configuration conf, String classname) + public static void setPartitioner(Configuration conf, String classname) { - conf.set(INPUT_PARTITIONER_CONFIG, classname); + conf.set(PARTITIONER_CONFIG, classname); } - public static IPartitioner getInputPartitioner(Configuration conf) + public static IPartitioner getPartitioner(Configuration conf) { try { - return FBUtilities.newPartitioner(conf.get(INPUT_PARTITIONER_CONFIG)); + return FBUtilities.newPartitioner(conf.get(PARTITIONER_CONFIG)); } catch (ConfigurationException e) { @@ -348,63 +346,17 @@ public class ConfigHelper } } - public static int getOutputRpcPort(Configuration conf) - { - return Integer.parseInt(conf.get(OUTPUT_THRIFT_PORT)); - } - - public static void setOutputRpcPort(Configuration conf, String port) - { - conf.set(OUTPUT_THRIFT_PORT, port); - } - - public static String getOutputInitialAddress(Configuration conf) - { - return conf.get(OUTPUT_INITIAL_THRIFT_ADDRESS); - } - - public static void setOutputInitialAddress(Configuration conf, String address) - { - conf.set(OUTPUT_INITIAL_THRIFT_ADDRESS, address); - } - - public static void setOutputPartitioner(Configuration conf, String classname) - { - conf.set(OUTPUT_PARTITIONER_CONFIG, classname); - } - - public static IPartitioner getOutputPartitioner(Configuration conf) - { - try - { - return FBUtilities.newPartitioner(conf.get(OUTPUT_PARTITIONER_CONFIG)); - } - catch (ConfigurationException e) - { - throw new RuntimeException(e); - } - } - - - public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws IOException - { - return getClientFromAddressList(conf, ConfigHelper.getInputInitialAddress(conf).split(","), ConfigHelper.getInputRpcPort(conf)); - } - - public static Cassandra.Client getClientFromOutputAddressList(Configuration conf) throws IOException - { - return getClientFromAddressList(conf, ConfigHelper.getOutputInitialAddress(conf).split(","), ConfigHelper.getOutputRpcPort(conf)); - } - private static Cassandra.Client getClientFromAddressList(Configuration conf, String[] addresses, int port) throws IOException + public static Cassandra.Client getClientFromAddressList(Configuration conf) throws IOException { + String[] addresses = ConfigHelper.getInitialAddress(conf).split(","); Cassandra.Client client = null; List<IOException> exceptions = new ArrayList<IOException>(); for (String address : addresses) { try { - client = createConnection(address, port, true); + client = createConnection(address, ConfigHelper.getRpcPort(conf), true); break; } catch (IOException ioe) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd00a235/test/unit/org/apache/cassandra/client/TestRingCache.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/client/TestRingCache.java b/test/unit/org/apache/cassandra/client/TestRingCache.java index 4fae42b..58c4d26 100644 --- a/test/unit/org/apache/cassandra/client/TestRingCache.java +++ b/test/unit/org/apache/cassandra/client/TestRingCache.java @@ -63,9 +63,9 @@ public class TestRingCache thriftClient = cassandraClient; String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(); conf = new Configuration(); - ConfigHelper.setOutputPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName()); - ConfigHelper.setOutputInitialAddress(conf, seed); - ConfigHelper.setOutputRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort())); + ConfigHelper.setPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName()); + ConfigHelper.setInitialAddress(conf, seed); + ConfigHelper.setRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort())); }
