Author: jbellis
Date: Tue Mar 1 02:52:03 2011
New Revision: 1075627
URL: http://svn.apache.org/viewvc?rev=1075627&view=rev
Log:
fix Hadoop ColumnFamilyOutputFormat droppingof mutations
patch by Eldon Stegall and Jeremy Hanna; reviewed by jbellis for CASSANDRA-2255
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1075627&r1=1075626&r2=1075627&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Tue Mar 1 02:52:03 2011
@@ -325,7 +325,7 @@ implements org.apache.hadoop.mapred.Reco
}
Map<ByteBuffer, Map<String, List<Mutation>>> batch = new
HashMap<ByteBuffer, Map<String, List<Mutation>>>();
- while (batch.size() < batchThreshold)
+ while (mutation != null)
{
Map<String, List<Mutation>> subBatch =
batch.get(mutation.left);
if (subBatch == null)
@@ -333,10 +333,12 @@ implements org.apache.hadoop.mapred.Reco
subBatch = Collections.singletonMap(columnFamily,
(List<Mutation>) new ArrayList<Mutation>());
batch.put(mutation.left, subBatch);
}
-
+
subBatch.get(columnFamily).add(mutation.right);
- if ((mutation = queue.poll()) == null)
+ if (batch.size() >= batchThreshold)
break;
+
+ mutation = queue.poll();
}
Iterator<InetAddress> iter = endpoints.iterator();