Author: jbellis
Date: Fri Sep 3 21:20:10 2010
New Revision: 992475
URL: http://svn.apache.org/viewvc?rev=992475&view=rev
Log:
support for Hadoop Streaming [non-jvm map/reduce via stdin/out]. patch by Stu
Hood; reviewed by jbellis for CASSANDRA-1368
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/interface/cassandra.genavro
cassandra/trunk/ivy.xml
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Sep 3 21:20:10 2010
@@ -47,6 +47,8 @@ dev
* change multiget key collection from list to set (CASSANDRA-1329)
* ability to modify keyspaces and column family definitions on a live cluster
(CASSANDRA-1285)
+ * support for Hadoop Streaming [non-jvm map/reduce via stdin/out]
+ (CASSANDRA-1368)
0.7-beta1
Modified: cassandra/trunk/NEWS.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Fri Sep 3 21:20:10 2010
@@ -15,7 +15,8 @@ Features
- Optional per-Column time-to-live field allows expiring data without
have to issue explicit remove commands
- `truncate` thrift method allows clearing an entire ColumnFamily at once
- - Hadoop OutputFormat support
+ - Hadoop OutputFormat and Streaming [non-jvm map/reduce via stdin/out]
+ support
- Up to 8x faster reads from row cache
- A new ByteOrderedPartitioner supports bytes keys with arbitrary content,
and orders keys by their byte value. This should be used in new
Modified: cassandra/trunk/interface/cassandra.genavro
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Fri Sep 3 21:20:10 2010
@@ -169,6 +169,11 @@ protocol Cassandra {
array<CfDef> cf_defs;
}
+ record StreamingMutation {
+ bytes key;
+ Mutation mutation;
+ }
+
record MutationsMapEntry {
bytes key;
map<array<Mutation>> mutations;
Modified: cassandra/trunk/ivy.xml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/ivy.xml?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/ivy.xml (original)
+++ cassandra/trunk/ivy.xml Fri Sep 3 21:20:10 2010
@@ -28,6 +28,7 @@
<dependency org="commons-logging" name="commons-logging" rev="1.1.1"/>
<dependency org="org.apache.rat" name="apache-rat" rev="0.6" />
<dependency org="com.cloudera.hadoop" name="hadoop-core" rev="0.20.2-320"/>
+ <dependency org="com.cloudera.hadoop" name="hadoop-streaming"
rev="0.20.2-320"/>
<dependency org="net.sf.jopt-simple" name="jopt-simple" rev="3.2"/>
<dependency org="net.java.dev.jna" name="jna" rev="3.2.7"/>
Modified:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
Fri Sep 3 21:20:10 2010
@@ -69,6 +69,7 @@ import org.slf4j.LoggerFactory;
* </p>
*/
public class ColumnFamilyOutputFormat extends
OutputFormat<ByteBuffer,List<Mutation>>
+ implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
{
private static final Logger logger =
LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
@@ -85,13 +86,17 @@ public class ColumnFamilyOutputFormat ex
@Override
public void checkOutputSpecs(JobContext context)
{
- Configuration conf = context.getConfiguration();
+ checkOutputSpecs(context.getConfiguration());
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
if (ConfigHelper.getOutputKeyspace(conf) == null ||
ConfigHelper.getOutputColumnFamily(conf) == null)
{
throw new UnsupportedOperationException("you must set the keyspace
and columnfamily with setColumnFamily()");
}
}
-
+
/**
* The OutputCommitter for this format does not write any data to the DFS.
*
@@ -107,6 +112,20 @@ public class ColumnFamilyOutputFormat ex
return new NullOutputCommitter();
}
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated @Override
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem,
org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated @Override
+ public ColumnFamilyRecordWriter
getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem,
org.apache.hadoop.mapred.JobConf job, String name,
org.apache.hadoop.util.Progressable progress) throws IOException
+ {
+ return new ColumnFamilyRecordWriter(job);
+ }
+
/**
* Get the {...@link RecordWriter} for the given task.
*
@@ -116,7 +135,7 @@ public class ColumnFamilyOutputFormat ex
* @throws IOException
*/
@Override
- public RecordWriter<ByteBuffer,List<Mutation>> getRecordWriter(final
TaskAttemptContext context) throws IOException, InterruptedException
+ public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext
context) throws IOException, InterruptedException
{
return new ColumnFamilyRecordWriter(context);
}
@@ -126,30 +145,29 @@ public class ColumnFamilyOutputFormat ex
* keyspace, and is logged in with the configured credentials.
*
* @param socket a socket pointing to a particular node, seed or otherwise
- * @param context a job context
+ * @param conf a job configuration
* @return a cassandra client
* @throws InvalidRequestException
* @throws TException
* @throws AuthenticationException
* @throws AuthorizationException
*/
- public static Cassandra.Client createAuthenticatedClient(TSocket socket,
JobContext context)
+ public static Cassandra.Client createAuthenticatedClient(TSocket socket,
Configuration conf)
throws InvalidRequestException, TException, AuthenticationException,
AuthorizationException
{
TBinaryProtocol binaryProtocol = new TBinaryProtocol(new
TFramedTransport(socket));
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
socket.open();
-
client.set_keyspace(ConfigHelper.getOutputKeyspace(context.getConfiguration()));
- if (ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration())
!= null)
+ client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
+ if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)
{
Map<String, String> creds = new HashMap<String, String>();
- creds.put(SimpleAuthenticator.USERNAME_KEY,
ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()));
- creds.put(SimpleAuthenticator.PASSWORD_KEY,
ConfigHelper.getOutputKeyspacePassword(context.getConfiguration()));
+ creds.put(SimpleAuthenticator.USERNAME_KEY,
ConfigHelper.getOutputKeyspaceUserName(conf));
+ creds.put(SimpleAuthenticator.PASSWORD_KEY,
ConfigHelper.getOutputKeyspacePassword(conf));
AuthenticationRequest authRequest = new
AuthenticationRequest(creds);
client.login(authRequest);
}
return client;
-
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Fri Sep 3 21:20:10 2010
@@ -47,6 +47,8 @@ import org.apache.cassandra.thrift.Slice
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -75,10 +77,10 @@ import org.apache.thrift.transport.TSock
* @see OutputFormat
*
*/
-final class ColumnFamilyRecordWriter extends
RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
+final class ColumnFamilyRecordWriter extends
RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>> implements
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
{
- // The task attempt context this writer is associated with.
- private final TaskAttemptContext context;
+ // The configuration this writer is associated with.
+ private final Configuration conf;
// The batched set of mutations grouped by endpoints.
private Map<InetAddress,Map<byte[],Map<String,List<Mutation>>>>
mutationsByEndpoint;
@@ -104,15 +106,20 @@ final class ColumnFamilyRecordWriter ext
*/
ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
{
- this.context = context;
- this.mutationsByEndpoint = new
HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
- this.ringCache = new
RingCache(ConfigHelper.getOutputKeyspace(context.getConfiguration()),
-
ConfigHelper.getPartitioner(context.getConfiguration()),
-
ConfigHelper.getInitialAddress(context.getConfiguration()),
-
ConfigHelper.getRpcPort(context.getConfiguration()));
- this.batchThreshold =
context.getConfiguration().getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD,
Long.MAX_VALUE);
+ this(context.getConfiguration());
}
+ ColumnFamilyRecordWriter(Configuration conf) throws IOException
+ {
+ this.conf = conf;
+ this.mutationsByEndpoint = new
HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
+ this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(conf),
+ ConfigHelper.getPartitioner(conf),
+ ConfigHelper.getInitialAddress(conf),
+ ConfigHelper.getRpcPort(conf));
+ this.batchThreshold =
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, Long.MAX_VALUE);
+ }
+
/**
* Return the endpoint responsible for the given key. The selected endpoint
* one whose token range contains the given key.
@@ -145,7 +152,7 @@ final class ColumnFamilyRecordWriter ext
* @throws IOException
*/
@Override
- public synchronized void write(ByteBuffer keybuff,
List<org.apache.cassandra.avro.Mutation> value) throws IOException,
InterruptedException
+ public synchronized void write(ByteBuffer keybuff,
List<org.apache.cassandra.avro.Mutation> value) throws IOException
{
maybeFlush();
byte[] key = copy(keybuff);
@@ -164,11 +171,11 @@ final class ColumnFamilyRecordWriter ext
mutationsByKey.put(key, cfMutation);
}
- List<Mutation> mutationList =
cfMutation.get(ConfigHelper.getOutputColumnFamily(context.getConfiguration()));
+ List<Mutation> mutationList =
cfMutation.get(ConfigHelper.getOutputColumnFamily(conf));
if (mutationList == null)
{
mutationList = new ArrayList<Mutation>();
-
cfMutation.put(ConfigHelper.getOutputColumnFamily(context.getConfiguration()),
mutationList);
+ cfMutation.put(ConfigHelper.getOutputColumnFamily(conf),
mutationList);
}
for (org.apache.cassandra.avro.Mutation amut : value)
@@ -254,6 +261,13 @@ final class ColumnFamilyRecordWriter ext
flush();
}
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated @Override
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws
IOException
+ {
+ flush();
+ }
+
/**
* Flush the mutations cache, iff more mutations have been cached than
* {...@link #batchThreshold}.
@@ -284,7 +298,7 @@ final class ColumnFamilyRecordWriter ext
List<Future<?>> mutationFutures = new ArrayList<Future<?>>();
for (Map.Entry<InetAddress, Map<byte[], Map<String,
List<Mutation>>>> entry : mutationsByEndpoint.entrySet())
{
- mutationFutures.add(executor.submit(new
EndpointCallable(context, entry.getKey(), entry.getValue())));
+ mutationFutures.add(executor.submit(new EndpointCallable(conf,
entry.getKey(), entry.getValue())));
}
// wait until we have all the results back
for (Future<?> mutationFuture : mutationFutures)
@@ -321,7 +335,7 @@ final class ColumnFamilyRecordWriter ext
public class EndpointCallable implements Callable<Void>
{
// The task attempt context associated with this callable.
- private TaskAttemptContext taskContext;
+ private Configuration conf;
// The endpoint of the primary replica for the rows being mutated
private InetAddress endpoint;
// The mutations to be performed in the node referenced by {...@link
@@ -332,13 +346,14 @@ final class ColumnFamilyRecordWriter ext
* Constructs an {...@link EndpointCallable} for the given endpoint
and set
* of mutations.
*
+ * @param conf job configuration
* @param endpoint the endpoint wherein to execute the mutations
* @param mutations the mutation map expected by
* {...@link Cassandra.Client#batch_mutate(Map,
ConsistencyLevel)}
*/
- public EndpointCallable(TaskAttemptContext taskContext, InetAddress
endpoint, Map<byte[], Map<String, List<Mutation>>> mutations)
+ public EndpointCallable(Configuration conf, InetAddress endpoint,
Map<byte[], Map<String, List<Mutation>>> mutations)
{
- this.taskContext = taskContext;
+ this.conf = conf;
this.endpoint = endpoint;
this.mutations = mutations;
}
@@ -352,8 +367,8 @@ final class ColumnFamilyRecordWriter ext
TSocket socket = null;
try
{
- socket = new TSocket(endpoint.getHostName(),
ConfigHelper.getRpcPort(taskContext.getConfiguration()));
- Cassandra.Client client =
ColumnFamilyOutputFormat.createAuthenticatedClient(socket, taskContext);
+ socket = new TSocket(endpoint.getHostName(),
ConfigHelper.getRpcPort(conf));
+ Cassandra.Client client =
ColumnFamilyOutputFormat.createAuthenticatedClient(socket, conf);
client.batch_mutate(mutations, ConsistencyLevel.ONE);
return null;
}