Author: tedyu
Date: Mon Jul 29 16:21:28 2013
New Revision: 1508125

URL: http://svn.apache.org/r1508125
Log:
HBASE-8874 PutCombiner is skipping KeyValues while combining puts of same row 
during bulkload (Rajeshbabu)


Modified:
    
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
    
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java

Modified: 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java?rev=1508125&r1=1508124&r2=1508125&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
 (original)
+++ 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
 Mon Jul 29 16:21:28 2013
@@ -19,15 +19,18 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
-import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Reducer;
 
 /**
@@ -43,31 +46,50 @@ public class PutCombiner<K> extends Redu
   @Override
   protected void reduce(K row, Iterable<Put> vals, Context context)
       throws IOException, InterruptedException {
-
-    int cnt = 0;
-    // There's nothing to say <code>K row</code> is the same as the rowkey
-    // used to construct Puts (value) instances. Thus the map of put.getRow()
-    // to combined Put is necessary.
-    // TODO: would be better if we knew <code>K row</code> and Put rowkey were
-    // identical. Then this whole Put buffering business goes away.
-    // TODO: Could use HeapSize to create an upper bound on the memory size of
-    // the puts map and flush some portion of the content while looping. This
+    // Using HeapSize to create an upper bound on the memory size of
+    // the puts and flush some portion of the content while looping. This
     // flush could result in multiple Puts for a single rowkey. That is
     // acceptable because Combiner is run as an optimization and it's not
     // critical that all Puts are grouped perfectly.
-    Map<byte[], Put> puts = new TreeMap<byte[], Put>(Bytes.BYTES_COMPARATOR);
+    long threshold = context.getConfiguration().getLong(
+        "putcombiner.row.threshold", 1L * (1<<30));
+    int cnt = 0;
+    long curSize = 0;
+    Put put = null;
+    Map<byte[], List<? extends Cell>> familyMap = null;
     for (Put p : vals) {
       cnt++;
-      if (!puts.containsKey(p.getRow())) {
-        puts.put(p.getRow(), p);
+      if (put == null) {
+        put = p;
+        familyMap = put.getFamilyMap();
       } else {
-        puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap());
+        for (Entry<byte[], List<? extends Cell>> entry : p.getFamilyMap()
+            .entrySet()) {
+          List<? extends Cell> cells = familyMap.get(entry.getKey());
+          List<KeyValue> kvs = (cells != null) ? (List<KeyValue>) cells : null;
+          for (Cell cell : entry.getValue()) {
+            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            curSize += kv.heapSize();
+            if (kvs != null) {
+              kvs.add(kv);
+            }
+          }
+          if (cells == null) {
+            familyMap.put(entry.getKey(), entry.getValue());
+          }
+        }
+        if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
+        if (curSize > threshold) {
+          LOG.info(String.format("Combined %d Put(s) into %d.", cnt, 1));
+          context.write(row, put);
+          put = null;
+          cnt = 0;
+        }
       }
     }
-
-    for (Put p : puts.values()) {
-      context.write(row, p);
+    if (put != null) {
+      LOG.info(String.format("Combined %d Put(s) into %d.", cnt, 1));
+      context.write(row, put);
     }
-    LOG.info(String.format("Combined %d Put(s) into %d.", cnt, puts.size()));
   }
 }

Modified: 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java?rev=1508125&r1=1508124&r2=1508125&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
 (original)
+++ 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
 Mon Jul 29 16:21:28 2013
@@ -55,7 +55,7 @@ public class PutSortReducer extends
   {
     // although reduce() is called per-row, handle pathological case
     long threshold = context.getConfiguration().getLong(
-        "putsortreducer.row.threshold", 2L * (1<<30));
+        "putsortreducer.row.threshold", 1L * (1<<30));
     Iterator<Put> iter = puts.iterator();
     while (iter.hasNext()) {
       TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
@@ -67,7 +67,7 @@ public class PutSortReducer extends
           for (Cell cell: cells) {
             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
             map.add(kv);
-            curSize += kv.getLength();
+            curSize += kv.heapSize();
           }
         }
       }


Reply via email to