Author: jbellis
Date: Fri Mar 5 20:16:52 2010
New Revision: 919583
URL: http://svn.apache.org/viewvc?rev=919583&view=rev
Log:
move configuration static methods into ConfigHelper. patch by jbellis;
reviewed by johano for CASSANDRA-837
Modified:
incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Modified:
incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java?rev=919583&r1=919582&r2=919583&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
Fri Mar 5 20:16:52 2010
@@ -25,6 +25,7 @@
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -127,9 +128,9 @@
job.setInputFormatClass(ColumnFamilyInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX +
i));
- ColumnFamilyInputFormat.setColumnFamily(job, KEYSPACE,
COLUMN_FAMILY);
+ ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE,
COLUMN_FAMILY);
SlicePredicate predicate = new
SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
- ColumnFamilyInputFormat.setSlicePredicate(job, predicate);
+ ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);
}
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=919583&r1=919582&r2=919583&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Fri Mar 5 20:16:52 2010
@@ -22,38 +22,23 @@
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import org.apache.log4j.Logger;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
public class ColumnFamilyInputFormat extends InputFormat<String,
SortedMap<byte[], IColumn>>
{
- private static final String KEYSPACE_CONFIG = "cassandra.input.keyspace";
- private static final String COLUMNFAMILY_CONFIG =
"cassandra.input.columnfamily";
- private static final String PREDICATE_CONFIG = "cassandra.input.predicate";
- private static final String INPUT_SPLIT_SIZE_CONFIG =
"cassandra.input.split.size";
private static final Logger logger =
Logger.getLogger(StorageService.class);
@@ -61,79 +46,6 @@
private String columnFamily;
private SlicePredicate predicate;
- public static void setColumnFamily(Job job, String keyspace, String
columnFamily)
- {
- if (keyspace == null)
- {
- throw new UnsupportedOperationException("keyspace may not be
null");
- }
- if (columnFamily == null)
- {
- throw new UnsupportedOperationException("columnfamily may not be
null");
- }
- try
- {
- ThriftValidation.validateColumnFamily(keyspace, columnFamily);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
- Configuration conf = job.getConfiguration();
- conf.set(KEYSPACE_CONFIG, keyspace);
- conf.set(COLUMNFAMILY_CONFIG, columnFamily);
- }
-
- /**
- * Set the size of the input split.
- * This affects the number of maps created, if the number is too small
- * the overhead of each map will take up the bulk of the job time.
- *
- * @param job Job you are about to run.
- * @param splitsize Size of the input split
- */
- public static void setInputSplitSize(Job job, int splitsize)
- {
- job.getConfiguration().setInt(INPUT_SPLIT_SIZE_CONFIG, splitsize);
- }
-
- public static void setSlicePredicate(Job job, SlicePredicate predicate)
- {
- Configuration conf = job.getConfiguration();
- conf.set(PREDICATE_CONFIG, predicateToString(predicate));
- }
-
- private static String predicateToString(SlicePredicate predicate)
- {
- assert predicate != null;
- // this is so awful it's kind of cool!
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- try
- {
- return serializer.toString(predicate, "UTF-8");
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- private SlicePredicate predicateFromString(String st)
- {
- assert st != null;
- TDeserializer deserializer = new TDeserializer(new
TJSONProtocol.Factory());
- SlicePredicate predicate = new SlicePredicate();
- try
- {
- deserializer.deserialize(predicate, st, "UTF-8");
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- return predicate;
- }
-
private void validateConfiguration()
{
if (keyspace == null || columnFamily == null)
@@ -149,15 +61,15 @@
public List<InputSplit> getSplits(JobContext context) throws IOException
{
Configuration conf = context.getConfiguration();
- keyspace = conf.get(KEYSPACE_CONFIG);
- columnFamily = conf.get(COLUMNFAMILY_CONFIG);
- predicate = predicateFromString(conf.get(PREDICATE_CONFIG));
+ predicate = ConfigHelper.getSlicePredicate(conf);
+ keyspace = ConfigHelper.getKeyspace(conf);
+ columnFamily = ConfigHelper.getColumnFamily(conf);
validateConfiguration();
// cannonical ranges and nodes holding replicas
List<TokenRange> masterRangeNodes = getRangeMap();
- int splitsize =
context.getConfiguration().getInt(INPUT_SPLIT_SIZE_CONFIG, 16384);
+ int splitsize =
ConfigHelper.getInputSplitSize(context.getConfiguration());
// cannonical ranges, split into pieces:
// for each range, pick a live owner and ask it to compute bite-sized
splits