Author: jbellis
Date: Fri Mar  5 20:18:03 2010
New Revision: 919584

URL: http://svn.apache.org/viewvc?rev=919584&view=rev
Log:
r/m fields from CFSplit that are redundant to information in configuration; use 
split size for row count.  patch by jbellis; reviewed by johano for 
CASSANDRA-837

Modified:
    
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java

Modified: 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=919584&r1=919583&r2=919584&view=diff
==============================================================================
--- 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
 (original)
+++ 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
 Fri Mar  5 20:18:03 2010
@@ -42,17 +42,13 @@
 
     private static final Logger logger = 
Logger.getLogger(StorageService.class);
 
-    private String keyspace;
-    private String columnFamily;
-    private SlicePredicate predicate;
-
-    private void validateConfiguration()
+    private void validateConfiguration(Configuration conf)
     {
-        if (keyspace == null || columnFamily == null)
+        if (ConfigHelper.getKeyspace(conf) == null || 
ConfigHelper.getColumnFamily(conf) == null)
         {
             throw new UnsupportedOperationException("you must set the keyspace 
and columnfamily with setColumnFamily()");
         }
-        if (predicate == null)
+        if (ConfigHelper.getSlicePredicate(conf) == null)
         {
             throw new UnsupportedOperationException("you must set the 
predicate with setPredicate");
         }
@@ -61,13 +57,11 @@
     public List<InputSplit> getSplits(JobContext context) throws IOException
     {
         Configuration conf = context.getConfiguration();
-        predicate = ConfigHelper.getSlicePredicate(conf);
-        keyspace = ConfigHelper.getKeyspace(conf);
-        columnFamily = ConfigHelper.getColumnFamily(conf);
-        validateConfiguration();
+
+        validateConfiguration(conf);
 
         // cannonical ranges and nodes holding replicas
-        List<TokenRange> masterRangeNodes = getRangeMap();
+        List<TokenRange> masterRangeNodes = 
getRangeMap(ConfigHelper.getKeyspace(conf));
 
         int splitsize = 
ConfigHelper.getInputSplitSize(context.getConfiguration());
         
@@ -91,7 +85,7 @@
             int i = 1;
             for ( ; i < tokens.size(); i++)
             {
-                ColumnFamilySplit split = new ColumnFamilySplit(keyspace, 
columnFamily, predicate, tokens.get(i - 1), tokens.get(i), endpoints);
+                ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 
1), tokens.get(i), endpoints);
                 logger.debug("adding " + split);
                 splits.add(split);
             }
@@ -128,7 +122,7 @@
         return splits;
     }
 
-    private List<TokenRange> getRangeMap() throws IOException
+    private List<TokenRange> getRangeMap(String keyspace) throws IOException
     {
         TSocket socket = new 
TSocket(DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(),
                                      DatabaseDescriptor.getThriftPort());

Modified: 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=919584&r1=919583&r2=919584&view=diff
==============================================================================
--- 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 (original)
+++ 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 Fri Mar  5 20:18:03 2010
@@ -38,6 +38,7 @@
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.SuperColumn;
 import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -47,11 +48,13 @@
 
 public class ColumnFamilyRecordReader extends RecordReader<String, 
SortedMap<byte[], IColumn>>
 {
-    private static final int ROWS_PER_RANGE_QUERY = 1024;
-
     private ColumnFamilySplit split;
     private RowIterator iter;
     private Pair<String, SortedMap<byte[], IColumn>> currentRow;
+    private SlicePredicate predicate;
+    private int rowCount;
+    private String cfName;
+    private String keyspace;
 
     public void close() {}
     
@@ -73,6 +76,11 @@
     public void initialize(InputSplit split, TaskAttemptContext context) 
throws IOException
     {
         this.split = (ColumnFamilySplit) split;
+        Configuration conf = context.getConfiguration();
+        predicate = ConfigHelper.getSlicePredicate(conf);
+        rowCount = ConfigHelper.getInputSplitSize(conf);
+        cfName = ConfigHelper.getColumnFamily(conf);
+        keyspace = ConfigHelper.getKeyspace(conf);
         iter = new RowIterator();
     }
     
@@ -89,7 +97,7 @@
 
         private List<KeySlice> rows;
         private int i = 0;
-        private AbstractType comparator = 
DatabaseDescriptor.getComparator(split.getTable(), split.getColumnFamily());
+        private AbstractType comparator = 
DatabaseDescriptor.getComparator(keyspace, cfName);
 
         private void maybeInit()
         {
@@ -107,14 +115,14 @@
             {
                 throw new RuntimeException(e);
             }
-            KeyRange keyRange = new KeyRange(ROWS_PER_RANGE_QUERY)
+            KeyRange keyRange = new KeyRange(rowCount)
                                 .setStart_token(split.getStartToken())
                                 .setEnd_token(split.getEndToken());
             try
             {
-                rows = client.get_range_slices(split.getTable(),
-                                               new 
ColumnParent(split.getColumnFamily()),
-                                               split.getPredicate(),
+                rows = client.get_range_slices(keyspace,
+                                               new ColumnParent(cfName),
+                                               predicate,
                                                keyRange,
                                                ConsistencyLevel.ONE);
             }
@@ -196,7 +204,7 @@
 
     private IColumn unthriftifySuper(SuperColumn super_column)
     {
-        AbstractType subComparator = 
DatabaseDescriptor.getSubComparator(split.getTable(), split.getColumnFamily());
+        AbstractType subComparator = 
DatabaseDescriptor.getSubComparator(keyspace, cfName);
         org.apache.cassandra.db.SuperColumn sc = new 
org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
         for (Column column : super_column.columns)
         {

Modified: 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=919584&r1=919583&r2=919584&view=diff
==============================================================================
--- 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
 (original)
+++ 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
 Fri Mar  5 20:18:03 2010
@@ -39,20 +39,14 @@
 {
     private String startToken;
     private String endToken;
-    private String table;
-    private String columnFamily;
     private String[] dataNodes;
-    private SlicePredicate predicate;
 
-    public ColumnFamilySplit(String table, String columnFamily, SlicePredicate 
predicate, String startToken, String endToken, String[] dataNodes)
+    public ColumnFamilySplit(String startToken, String endToken, String[] 
dataNodes)
     {
         assert startToken != null;
         assert endToken != null;
         this.startToken = startToken;
         this.endToken = endToken;
-        this.columnFamily = columnFamily;
-        this.predicate = predicate;
-        this.table = table;
         this.dataNodes = dataNodes;
     }
 
@@ -66,21 +60,6 @@
         return endToken;
     }
 
-    public String getTable()
-    {
-        return table;
-    }
-
-    public String getColumnFamily()
-    {
-        return columnFamily;
-    }
-
-    public SlicePredicate getPredicate()
-    {
-        return predicate;
-    }
-
     // getLength and getLocations satisfy the InputSplit abstraction
     
     public long getLength()
@@ -97,18 +76,12 @@
     // This should only be used by KeyspaceSplit.read();
     protected ColumnFamilySplit() {}
 
-    private static final TSerializer tSerializer = new TSerializer(new 
TBinaryProtocol.Factory());
-    private static final TDeserializer tDeserializer = new TDeserializer(new 
TBinaryProtocol.Factory());
-
     // These three methods are for serializing and deserializing
     // KeyspaceSplits as needed by the Writable interface.
     public void write(DataOutput out) throws IOException
     {
-        out.writeUTF(table);
-        out.writeUTF(columnFamily);
         out.writeUTF(startToken);
         out.writeUTF(endToken);
-        FBUtilities.serialize(tSerializer, predicate, out);
 
         out.writeInt(dataNodes.length);
         for (String endPoint : dataNodes)
@@ -119,12 +92,8 @@
 
     public void readFields(DataInput in) throws IOException
     {
-        table = in.readUTF();
-        columnFamily = in.readUTF();
         startToken = in.readUTF();
         endToken = in.readUTF();
-        predicate = new SlicePredicate();
-        FBUtilities.deserialize(tDeserializer, predicate, in);
 
         int numOfEndPoints = in.readInt();
         dataNodes = new String[numOfEndPoints];
@@ -140,10 +109,7 @@
         return "ColumnFamilySplit{" +
                "startToken='" + startToken + '\'' +
                ", endToken='" + endToken + '\'' +
-               ", table='" + table + '\'' +
-               ", columnFamily='" + columnFamily + '\'' +
                ", dataNodes=" + (dataNodes == null ? null : 
Arrays.asList(dataNodes)) +
-               ", predicate=" + predicate +
                '}';
     }
 


Reply via email to