Author: jbellis
Date: Fri Mar 5 20:18:03 2010
New Revision: 919584
URL: http://svn.apache.org/viewvc?rev=919584&view=rev
Log:
r/m fields from CFSplit that are redundant to information in configuration; use
split size for row count. patch by jbellis; reviewed by johano for
CASSANDRA-837
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=919584&r1=919583&r2=919584&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Fri Mar 5 20:18:03 2010
@@ -42,17 +42,13 @@
private static final Logger logger =
Logger.getLogger(StorageService.class);
- private String keyspace;
- private String columnFamily;
- private SlicePredicate predicate;
-
- private void validateConfiguration()
+ private void validateConfiguration(Configuration conf)
{
- if (keyspace == null || columnFamily == null)
+ if (ConfigHelper.getKeyspace(conf) == null ||
ConfigHelper.getColumnFamily(conf) == null)
{
throw new UnsupportedOperationException("you must set the keyspace
and columnfamily with setColumnFamily()");
}
- if (predicate == null)
+ if (ConfigHelper.getSlicePredicate(conf) == null)
{
throw new UnsupportedOperationException("you must set the
predicate with setPredicate");
}
@@ -61,13 +57,11 @@
public List<InputSplit> getSplits(JobContext context) throws IOException
{
Configuration conf = context.getConfiguration();
- predicate = ConfigHelper.getSlicePredicate(conf);
- keyspace = ConfigHelper.getKeyspace(conf);
- columnFamily = ConfigHelper.getColumnFamily(conf);
- validateConfiguration();
+
+ validateConfiguration(conf);
// cannonical ranges and nodes holding replicas
- List<TokenRange> masterRangeNodes = getRangeMap();
+ List<TokenRange> masterRangeNodes =
getRangeMap(ConfigHelper.getKeyspace(conf));
int splitsize =
ConfigHelper.getInputSplitSize(context.getConfiguration());
@@ -91,7 +85,7 @@
int i = 1;
for ( ; i < tokens.size(); i++)
{
- ColumnFamilySplit split = new ColumnFamilySplit(keyspace,
columnFamily, predicate, tokens.get(i - 1), tokens.get(i), endpoints);
+ ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i -
1), tokens.get(i), endpoints);
logger.debug("adding " + split);
splits.add(split);
}
@@ -128,7 +122,7 @@
return splits;
}
- private List<TokenRange> getRangeMap() throws IOException
+ private List<TokenRange> getRangeMap(String keyspace) throws IOException
{
TSocket socket = new
TSocket(DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(),
DatabaseDescriptor.getThriftPort());
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=919584&r1=919583&r2=919584&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Fri Mar 5 20:18:03 2010
@@ -38,6 +38,7 @@
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -47,11 +48,13 @@
public class ColumnFamilyRecordReader extends RecordReader<String,
SortedMap<byte[], IColumn>>
{
- private static final int ROWS_PER_RANGE_QUERY = 1024;
-
private ColumnFamilySplit split;
private RowIterator iter;
private Pair<String, SortedMap<byte[], IColumn>> currentRow;
+ private SlicePredicate predicate;
+ private int rowCount;
+ private String cfName;
+ private String keyspace;
public void close() {}
@@ -73,6 +76,11 @@
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException
{
this.split = (ColumnFamilySplit) split;
+ Configuration conf = context.getConfiguration();
+ predicate = ConfigHelper.getSlicePredicate(conf);
+ rowCount = ConfigHelper.getInputSplitSize(conf);
+ cfName = ConfigHelper.getColumnFamily(conf);
+ keyspace = ConfigHelper.getKeyspace(conf);
iter = new RowIterator();
}
@@ -89,7 +97,7 @@
private List<KeySlice> rows;
private int i = 0;
- private AbstractType comparator =
DatabaseDescriptor.getComparator(split.getTable(), split.getColumnFamily());
+ private AbstractType comparator =
DatabaseDescriptor.getComparator(keyspace, cfName);
private void maybeInit()
{
@@ -107,14 +115,14 @@
{
throw new RuntimeException(e);
}
- KeyRange keyRange = new KeyRange(ROWS_PER_RANGE_QUERY)
+ KeyRange keyRange = new KeyRange(rowCount)
.setStart_token(split.getStartToken())
.setEnd_token(split.getEndToken());
try
{
- rows = client.get_range_slices(split.getTable(),
- new
ColumnParent(split.getColumnFamily()),
- split.getPredicate(),
+ rows = client.get_range_slices(keyspace,
+ new ColumnParent(cfName),
+ predicate,
keyRange,
ConsistencyLevel.ONE);
}
@@ -196,7 +204,7 @@
private IColumn unthriftifySuper(SuperColumn super_column)
{
- AbstractType subComparator =
DatabaseDescriptor.getSubComparator(split.getTable(), split.getColumnFamily());
+ AbstractType subComparator =
DatabaseDescriptor.getSubComparator(keyspace, cfName);
org.apache.cassandra.db.SuperColumn sc = new
org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
for (Column column : super_column.columns)
{
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=919584&r1=919583&r2=919584&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Fri Mar 5 20:18:03 2010
@@ -39,20 +39,14 @@
{
private String startToken;
private String endToken;
- private String table;
- private String columnFamily;
private String[] dataNodes;
- private SlicePredicate predicate;
- public ColumnFamilySplit(String table, String columnFamily, SlicePredicate
predicate, String startToken, String endToken, String[] dataNodes)
+ public ColumnFamilySplit(String startToken, String endToken, String[]
dataNodes)
{
assert startToken != null;
assert endToken != null;
this.startToken = startToken;
this.endToken = endToken;
- this.columnFamily = columnFamily;
- this.predicate = predicate;
- this.table = table;
this.dataNodes = dataNodes;
}
@@ -66,21 +60,6 @@
return endToken;
}
- public String getTable()
- {
- return table;
- }
-
- public String getColumnFamily()
- {
- return columnFamily;
- }
-
- public SlicePredicate getPredicate()
- {
- return predicate;
- }
-
// getLength and getLocations satisfy the InputSplit abstraction
public long getLength()
@@ -97,18 +76,12 @@
// This should only be used by KeyspaceSplit.read();
protected ColumnFamilySplit() {}
- private static final TSerializer tSerializer = new TSerializer(new
TBinaryProtocol.Factory());
- private static final TDeserializer tDeserializer = new TDeserializer(new
TBinaryProtocol.Factory());
-
// These three methods are for serializing and deserializing
// KeyspaceSplits as needed by the Writable interface.
public void write(DataOutput out) throws IOException
{
- out.writeUTF(table);
- out.writeUTF(columnFamily);
out.writeUTF(startToken);
out.writeUTF(endToken);
- FBUtilities.serialize(tSerializer, predicate, out);
out.writeInt(dataNodes.length);
for (String endPoint : dataNodes)
@@ -119,12 +92,8 @@
public void readFields(DataInput in) throws IOException
{
- table = in.readUTF();
- columnFamily = in.readUTF();
startToken = in.readUTF();
endToken = in.readUTF();
- predicate = new SlicePredicate();
- FBUtilities.deserialize(tDeserializer, predicate, in);
int numOfEndPoints = in.readInt();
dataNodes = new String[numOfEndPoints];
@@ -140,10 +109,7 @@
return "ColumnFamilySplit{" +
"startToken='" + startToken + '\'' +
", endToken='" + endToken + '\'' +
- ", table='" + table + '\'' +
- ", columnFamily='" + columnFamily + '\'' +
", dataNodes=" + (dataNodes == null ? null :
Arrays.asList(dataNodes)) +
- ", predicate=" + predicate +
'}';
}