Author: jbellis
Date: Fri Feb 12 21:53:15 2010
New Revision: 909622
URL: http://svn.apache.org/viewvc?rev=909622&view=rev
Log:
add basic hadoop support using Thrift, one split per node
patch by jbellis; reviewed by Stu Hood for CASSANDRA-342
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
(with props)
Modified:
incubator/cassandra/trunk/ivy.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
Modified: incubator/cassandra/trunk/ivy.xml
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/ivy.xml?rev=909622&r1=909621&r2=909622&view=diff
==============================================================================
--- incubator/cassandra/trunk/ivy.xml (original)
+++ incubator/cassandra/trunk/ivy.xml Fri Feb 12 21:53:15 2010
@@ -19,6 +19,8 @@
<ivy-module version="2.0">
<info organisation="apache-cassandra" module="cassandra"/>
<dependencies>
+ <dependency org="org.apache.mahout.hadoop"
+ name="hadoop-core" rev="0.20.1"/>
<!-- FIXME: paranamer and jackson can be dropped after we're depending
on avro (since it depends on them). -->
<dependency org="com.thoughtworks.paranamer"
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=909622&r1=909621&r2=909622&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
Fri Feb 12 21:53:15 2010
@@ -47,7 +47,7 @@
private AtomicInteger localDeletionTime = new
AtomicInteger(Integer.MIN_VALUE);
private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
- SuperColumn(byte[] name, AbstractType comparator)
+ public SuperColumn(byte[] name, AbstractType comparator)
{
this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator));
}
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=909622&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Fri Feb 12 21:53:15 2010
@@ -0,0 +1,112 @@
+package org.apache.cassandra.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class ColumnFamilyInputFormat extends InputFormat<String,
SortedMap<byte[], IColumn>>
+{
+ private static final String KEYSPACE_CONFIG = "cassandra.input.keyspace";
+ private static final String COLUMNFAMILY_CONFIG =
"cassandra.input.columnfamily";
+
+ private static final Logger logger =
Logger.getLogger(StorageService.class);
+
+ private String keyspace;
+ private String columnFamily;
+
+ public static void setColumnFamily(Job job, String keyspace, String
columnFamily)
+ {
+ validateNotNullKeyspaceAndColumnFamily(keyspace, columnFamily);
+ try
+ {
+ ThriftValidation.validateColumnFamily(keyspace, columnFamily);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException(e);
+ }
+ Configuration conf = job.getConfiguration();
+ conf.set(KEYSPACE_CONFIG, keyspace);
+ conf.set(COLUMNFAMILY_CONFIG, columnFamily);
+ }
+
+ private static void validateNotNullKeyspaceAndColumnFamily(String
keyspace, String columnFamily)
+ {
+ if (keyspace == null)
+ {
+ throw new RuntimeException("you forgot to set the keyspace with
setKeyspace()");
+ }
+ if (columnFamily == null)
+ {
+ throw new RuntimeException("you forgot to set the column family
with setColumnFamily()");
+ }
+ }
+
+ public List<InputSplit> getSplits(JobContext context) throws IOException
+ {
+ Configuration conf = context.getConfiguration();
+ keyspace = conf.get(KEYSPACE_CONFIG);
+ columnFamily = conf.get(COLUMNFAMILY_CONFIG);
+ validateNotNullKeyspaceAndColumnFamily(keyspace, columnFamily);
+
+ List<TokenRange> map = getRangeMap();
+ ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+ for (TokenRange entry : map)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("split range is [" + entry.start_token + ", " +
entry.end_token + "]");
+ String[] endpoints = entry.endpoints.toArray(new String[0]);
+ splits.add(new ColumnFamilySplit(keyspace, columnFamily,
entry.start_token, entry.end_token, endpoints));
+ }
+
+ return splits;
+ }
+
+ private List<TokenRange> getRangeMap() throws IOException
+ {
+ TSocket socket = new
TSocket(DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(),
+ DatabaseDescriptor.getThriftPort());
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false,
false);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ try
+ {
+ socket.open();
+ }
+ catch (TTransportException e)
+ {
+ throw new IOException(e);
+ }
+ List<TokenRange> map;
+ try
+ {
+ map = client.describe_ring(keyspace);
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return map;
+ }
+
+ @Override
+ public RecordReader<String, SortedMap<byte[], IColumn>>
createRecordReader(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext) throws IOException, InterruptedException
+ {
+ return new ColumnFamilyRecordReader();
+ }
+}
\ No newline at end of file
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=909622&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Fri Feb 12 21:53:15 2010
@@ -0,0 +1,196 @@
+package org.apache.cassandra.hadoop;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.ArrayUtils;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class ColumnFamilyRecordReader extends RecordReader<String,
SortedMap<byte[], IColumn>>
+{
+ private static final int ROWS_PER_RANGE_QUERY = 1024;
+
+ private ColumnFamilySplit split;
+ private RowIterator iter;
+ private Pair<String, SortedMap<byte[], IColumn>> currentRow;
+
+ public void close() {}
+
+ public String getCurrentKey()
+ {
+ return currentRow.left;
+ }
+
+ public SortedMap<byte[], IColumn> getCurrentValue()
+ {
+ return currentRow.right;
+ }
+
+ public float getProgress()
+ {
+ return ((float)iter.rowsRead()) / iter.size();
+ }
+
+ public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException
+ {
+ this.split = (ColumnFamilySplit) split;
+ iter = new RowIterator();
+ }
+
+ public boolean nextKeyValue() throws IOException
+ {
+ if (!iter.hasNext())
+ return false;
+ currentRow = iter.next();
+ return true;
+ }
+
+ private class RowIterator extends AbstractIterator<Pair<String,
SortedMap<byte[], IColumn>>>
+ {
+
+ private List<KeySlice> rows;
+ private int i = 0;
+ private AbstractType comparator =
DatabaseDescriptor.getComparator(split.getTable(), split.getColumnFamily());
+
+ private void maybeInit()
+ {
+ if (rows != null)
+ return;
+ TSocket socket = new TSocket(getLocation(),
+ DatabaseDescriptor.getThriftPort());
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket,
false, false);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ try
+ {
+ socket.open();
+ }
+ catch (TTransportException e)
+ {
+ throw new RuntimeException(e);
+ }
+ SliceRange sliceRange = new SliceRange(ArrayUtils.EMPTY_BYTE_ARRAY,
+ ArrayUtils.EMPTY_BYTE_ARRAY,
+ false,
+ Integer.MAX_VALUE);
+ KeyRange keyRange = new KeyRange(ROWS_PER_RANGE_QUERY)
+ .setStart_token(split.getStartToken())
+ .setEnd_token(split.getEndToken());
+ // TODO "paging" large rows would be good
+ try
+ {
+ rows = client.get_range_slices(split.getTable(),
+ new
ColumnParent(split.getColumnFamily()),
+ new
SlicePredicate().setSlice_range(sliceRange),
+ keyRange,
+ ConsistencyLevel.ONE);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // we don't use endpointsnitch since we are trying to support hadoop
nodes that are
+ // not necessarily on Cassandra machines, too. This should be
adequate for single-DC clusters, at least.
+ private String getLocation()
+ {
+ InetAddress[] localAddresses = new InetAddress[0];
+ try
+ {
+ localAddresses =
InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ for (InetAddress address : localAddresses)
+ {
+ for (String location : split.getLocations())
+ {
+ InetAddress locationAddress = null;
+ try
+ {
+ locationAddress = InetAddress.getByName(location);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ if (address.equals(locationAddress))
+ {
+ return location;
+ }
+ }
+ }
+ return split.getLocations()[0];
+ }
+
+ public int size()
+ {
+ maybeInit();
+ return rows.size();
+ }
+
+ public int rowsRead()
+ {
+ return i;
+ }
+
+ @Override
+ protected Pair<String, SortedMap<byte[], IColumn>> computeNext()
+ {
+ maybeInit();
+ if (i == rows.size())
+ return endOfData();
+ KeySlice ks = rows.get(i++);
+ SortedMap<byte[], IColumn> map = new TreeMap<byte[],
IColumn>(comparator);
+ for (ColumnOrSuperColumn cosc : ks.columns)
+ {
+ IColumn column = unthriftify(cosc);
+ map.put(column.name(), column);
+ }
+ return new Pair<String, SortedMap<byte[], IColumn>>(ks.key, map);
+ }
+ }
+
+ private IColumn unthriftify(ColumnOrSuperColumn cosc)
+ {
+ if (cosc.column == null)
+ return unthriftifySuper(cosc.super_column);
+ return unthriftifySimple(cosc.column);
+ }
+
+ private IColumn unthriftifySuper(SuperColumn super_column)
+ {
+ AbstractType subComparator =
DatabaseDescriptor.getSubComparator(split.getTable(), split.getColumnFamily());
+ org.apache.cassandra.db.SuperColumn sc = new
org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
+ for (Column column : super_column.columns)
+ {
+ sc.addColumn(unthriftifySimple(column));
+ }
+ return sc;
+ }
+
+ private IColumn unthriftifySimple(Column column)
+ {
+ return new org.apache.cassandra.db.Column(column.name, column.value,
column.timestamp);
+ }
+}
\ No newline at end of file
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=909622&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Fri Feb 12 21:53:15 2010
@@ -0,0 +1,102 @@
+package org.apache.cassandra.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class ColumnFamilySplit extends InputSplit implements Writable
+{
+ private String startToken;
+ private String endToken;
+ private String table;
+ private String columnFamily;
+ private String[] dataNodes;
+
+ public ColumnFamilySplit(String table, String columnFamily, String
startToken, String endToken, String[] dataNodes)
+ {
+ assert startToken != null;
+ assert endToken != null;
+ this.startToken = startToken;
+ this.endToken = endToken;
+ this.columnFamily = columnFamily;
+ this.table = table;
+ this.dataNodes = dataNodes;
+ }
+
+ public String getStartToken()
+ {
+ return startToken;
+ }
+
+ public String getEndToken()
+ {
+ return endToken;
+ }
+
+ public String getTable()
+ {
+ return table;
+ }
+
+ public String getColumnFamily()
+ {
+ return columnFamily;
+ }
+
+ // getLength and getLocations satisfy the InputSplit abstraction
+
+ public long getLength()
+ {
+ // only used for sorting splits. we don't have the capability, yet.
+ return 0;
+ }
+
+ public String[] getLocations()
+ {
+ return dataNodes;
+ }
+
+ // This should only be used by KeyspaceSplit.read();
+ protected ColumnFamilySplit() {}
+
+ // These three methods are for serializing and deserializing
+ // KeyspaceSplits as needed by the Writable interface.
+ public void write(DataOutput out) throws IOException
+ {
+ out.writeUTF(table);
+ out.writeUTF(columnFamily);
+ out.writeUTF(startToken);
+ out.writeUTF(endToken);
+
+ out.writeInt(dataNodes.length);
+ for (String endPoint : dataNodes)
+ {
+ out.writeUTF(endPoint);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException
+ {
+ table = in.readUTF();
+ columnFamily = in.readUTF();
+ startToken = in.readUTF();
+ endToken = in.readUTF();
+
+ int numOfEndPoints = in.readInt();
+ dataNodes = new String[numOfEndPoints];
+ for(int i = 0; i < numOfEndPoints; i++)
+ {
+ dataNodes[i] = in.readUTF();
+ }
+ }
+
+ public static ColumnFamilySplit read(DataInput in) throws IOException
+ {
+ ColumnFamilySplit w = new ColumnFamilySplit();
+ w.readFields(in);
+ return w;
+ }
+}
\ No newline at end of file
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
------------------------------------------------------------------------------
svn:eol-style = native