Author: tedyu
Date: Mon Jul 29 16:21:28 2013
New Revision: 1508125
URL: http://svn.apache.org/r1508125
Log:
HBASE-8874 PutCombiner is skipping KeyValues while combining puts of same row
during bulkload (Rajeshbabu)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java?rev=1508125&r1=1508124&r2=1508125&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
(original)
+++
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
Mon Jul 29 16:21:28 2013
@@ -19,15 +19,18 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
-import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Reducer;
/**
@@ -43,31 +46,50 @@ public class PutCombiner<K> extends Redu
@Override
protected void reduce(K row, Iterable<Put> vals, Context context)
throws IOException, InterruptedException {
-
- int cnt = 0;
- // There's nothing to say <code>K row</code> is the same as the rowkey
- // used to construct Puts (value) instances. Thus the map of put.getRow()
- // to combined Put is necessary.
- // TODO: would be better if we knew <code>K row</code> and Put rowkey were
- // identical. Then this whole Put buffering business goes away.
- // TODO: Could use HeapSize to create an upper bound on the memory size of
- // the puts map and flush some portion of the content while looping. This
+ // Using HeapSize to create an upper bound on the memory size of
+ // the puts and flush some portion of the content while looping. This
// flush could result in multiple Puts for a single rowkey. That is
// acceptable because Combiner is run as an optimization and it's not
// critical that all Puts are grouped perfectly.
- Map<byte[], Put> puts = new TreeMap<byte[], Put>(Bytes.BYTES_COMPARATOR);
+ long threshold = context.getConfiguration().getLong(
+ "putcombiner.row.threshold", 1L * (1<<30));
+ int cnt = 0;
+ long curSize = 0;
+ Put put = null;
+ Map<byte[], List<? extends Cell>> familyMap = null;
for (Put p : vals) {
cnt++;
- if (!puts.containsKey(p.getRow())) {
- puts.put(p.getRow(), p);
+ if (put == null) {
+ put = p;
+ familyMap = put.getFamilyMap();
} else {
- puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap());
+ for (Entry<byte[], List<? extends Cell>> entry : p.getFamilyMap()
+ .entrySet()) {
+ List<? extends Cell> cells = familyMap.get(entry.getKey());
+ List<KeyValue> kvs = (cells != null) ? (List<KeyValue>) cells : null;
+ for (Cell cell : entry.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ curSize += kv.heapSize();
+ if (kvs != null) {
+ kvs.add(kv);
+ }
+ }
+ if (cells == null) {
+ familyMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
+ if (curSize > threshold) {
+ LOG.info(String.format("Combined %d Put(s) into %d.", cnt, 1));
+ context.write(row, put);
+ put = null;
+ cnt = 0;
+ }
}
}
-
- for (Put p : puts.values()) {
- context.write(row, p);
+ if (put != null) {
+ LOG.info(String.format("Combined %d Put(s) into %d.", cnt, 1));
+ context.write(row, put);
}
- LOG.info(String.format("Combined %d Put(s) into %d.", cnt, puts.size()));
}
}
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java?rev=1508125&r1=1508124&r2=1508125&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
(original)
+++
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
Mon Jul 29 16:21:28 2013
@@ -55,7 +55,7 @@ public class PutSortReducer extends
{
// although reduce() is called per-row, handle pathological case
long threshold = context.getConfiguration().getLong(
- "putsortreducer.row.threshold", 2L * (1<<30));
+ "putsortreducer.row.threshold", 1L * (1<<30));
Iterator<Put> iter = puts.iterator();
while (iter.hasNext()) {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
@@ -67,7 +67,7 @@ public class PutSortReducer extends
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
map.add(kv);
- curSize += kv.getLength();
+ curSize += kv.heapSize();
}
}
}