Author: jbellis
Date: Mon Jul 26 18:19:25 2010
New Revision: 979398
URL: http://svn.apache.org/viewvc?rev=979398&view=rev
Log:
move DD methods called during hadoop job processing to FBUtilities. patch by
jbellis for CASSANDRA-1280
Modified:
cassandra/branches/cassandra-0.6/contrib/word_count/bin/word_count
cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: cassandra/branches/cassandra-0.6/contrib/word_count/bin/word_count
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/word_count/bin/word_count?rev=979398&r1=979397&r2=979398&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/contrib/word_count/bin/word_count
(original)
+++ cassandra/branches/cassandra-0.6/contrib/word_count/bin/word_count Mon Jul
26 18:19:25 2010
@@ -31,7 +31,7 @@ if [ ! -e $cwd/../build/word_count.jar ]
fi
CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
-CLASSPATH=$CLASSPATH:.:$cwd/../../../build/classes
+CLASSPATH=$CLASSPATH:$cwd/../../../build/classes
for jar in $cwd/../build/lib/jars/*.jar; do
CLASSPATH=$CLASSPATH:$jar
done
Modified: cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java?rev=979398&r1=979397&r2=979398&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
(original)
+++ cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java Mon
Jul 26 18:19:25 2010
@@ -111,13 +111,12 @@ public class WordCount extends Configure
public int run(String[] args) throws Exception
{
- Configuration conf = getConf();
for (int i = 0; i < WordCountSetup.TEST_COUNT; i++)
{
String columnName = "text" + i;
- conf.set(CONF_COLUMN_NAME, columnName);
- Job job = new Job(conf, "wordcount");
+ getConf().set(CONF_COLUMN_NAME, columnName);
+ Job job = new Job(getConf(), "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
@@ -128,7 +127,7 @@ public class WordCount extends Configure
job.setInputFormatClass(ColumnFamilyInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX +
i));
- ConfigHelper.setThriftContact(conf, "localhost", 9160);
+ ConfigHelper.setThriftContact(job.getConfiguration(), "localhost",
9160);
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE,
COLUMN_FAMILY);
SlicePredicate predicate = new
SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=979398&r1=979397&r2=979398&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Mon Jul 26 18:19:25 2010
@@ -268,7 +268,7 @@ public class DatabaseDescriptor
}
try
{
- partitioner = newPartitioner(partitionerClassName);
+ partitioner = FBUtilities.newPartitioner(partitionerClassName);
}
catch (Exception e)
{
@@ -543,22 +543,6 @@ public class DatabaseDescriptor
}
}
- public static IPartitioner newPartitioner(String partitionerClassName)
- {
- if (!partitionerClassName.contains("."))
- partitionerClassName = "org.apache.cassandra.dht." +
partitionerClassName;
-
- try
- {
- Class cls = Class.forName(partitionerClassName);
- return (IPartitioner) cls.getConstructor().newInstance();
- }
- catch (Exception e)
- {
- throw new RuntimeException("Invalid partitioner class " +
partitionerClassName);
- }
- }
-
private static void readTablesFromXml() throws ConfigurationException
{
XMLUtils xmlUtils = null;
@@ -782,7 +766,7 @@ public class DatabaseDescriptor
try
{
- return getComparator(compareWith);
+ return FBUtilities.getComparator(compareWith);
}
catch (Exception e)
{
@@ -792,28 +776,6 @@ public class DatabaseDescriptor
}
}
- public static AbstractType getComparator(String compareWith)
- {
- Class<? extends AbstractType> typeClass;
- try
- {
- if (compareWith == null)
- {
- typeClass = BytesType.class;
- }
- else
- {
- String className = compareWith.contains(".") ? compareWith :
"org.apache.cassandra.db.marshal." + compareWith;
- typeClass = (Class<? extends
AbstractType>)Class.forName(className);
- }
- return typeClass.getConstructor().newInstance();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
/**
* Creates all storage-related directories.
* @throws IOException when a disk problem is encountered.
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=979398&r1=979397&r2=979398&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Mon Jul 26 18:19:25 2010
@@ -31,13 +31,13 @@ import java.util.TreeMap;
import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.SuperColumn;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -122,10 +122,10 @@ public class ColumnFamilyRecordReader ex
try
{
socket.open();
- partitioner =
DatabaseDescriptor.newPartitioner(client.describe_partitioner());
+ partitioner =
FBUtilities.newPartitioner(client.describe_partitioner());
Map<String, String> info =
client.describe_keyspace(keyspace).get(cfName);
- comparator =
DatabaseDescriptor.getComparator(info.get("CompareWith"));
- subComparator =
DatabaseDescriptor.getComparator(info.get("CompareSubcolumnsWith"));
+ comparator =
FBUtilities.getComparator(info.get("CompareWith"));
+ subComparator =
FBUtilities.getComparator(info.get("CompareSubcolumnsWith"));
}
catch (TException e)
{
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=979398&r1=979397&r2=979398&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Mon Jul 26 18:19:25 2010
@@ -21,13 +21,8 @@ package org.apache.cassandra.hadoop;
*/
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TDeserializer;
@@ -46,9 +41,6 @@ public class ConfigHelper
private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
private static final String THRIFT_PORT = "cassandra.thrift.port";
private static final String INITIAL_THRIFT_ADDRESS =
"cassandra.thrift.address";
- private static final String COMPARATOR = "cassandra.input.comparator";
- private static final String SUB_COMPARATOR =
"cassandra.input.subcomparator";
- private static final String PARTITIONER = "cassandra.partitioner";
/**
* Set the keyspace and column family for this job.
@@ -68,14 +60,6 @@ public class ConfigHelper
{
throw new UnsupportedOperationException("columnfamily may not be
null");
}
- try
- {
- ThriftValidation.validateColumnFamily(keyspace, columnFamily);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
conf.set(KEYSPACE_CONFIG, keyspace);
conf.set(COLUMNFAMILY_CONFIG, columnFamily);
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=979398&r1=979397&r2=979398&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
Mon Jul 26 18:19:25 2010
@@ -39,6 +39,9 @@ import org.apache.commons.collections.it
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.thrift.TBase;
@@ -508,4 +511,42 @@ public class FBUtilities
}
}
}
+
+ public static IPartitioner newPartitioner(String partitionerClassName)
+ {
+ if (!partitionerClassName.contains("."))
+ partitionerClassName = "org.apache.cassandra.dht." +
partitionerClassName;
+
+ try
+ {
+ Class cls = Class.forName(partitionerClassName);
+ return (IPartitioner) cls.getConstructor().newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Invalid partitioner class " +
partitionerClassName);
+ }
+ }
+
+ public static AbstractType getComparator(String compareWith)
+ {
+ Class<? extends AbstractType> typeClass;
+ try
+ {
+ if (compareWith == null)
+ {
+ typeClass = BytesType.class;
+ }
+ else
+ {
+ String className = compareWith.contains(".") ? compareWith :
"org.apache.cassandra.db.marshal." + compareWith;
+ typeClass = (Class<? extends
AbstractType>)Class.forName(className);
+ }
+ return typeClass.getConstructor().newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}