Author: jdcryans
Date: Fri Sep 24 20:37:44 2010
New Revision: 1001061

URL: http://svn.apache.org/viewvc?rev=1001061&view=rev
Log:
HBASE-3033  [replication] ReplicationSink.replicateEntries improvements

Modified:
    hbase/trunk/CHANGES.txt
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Modified: hbase/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1001061&r1=1001060&r2=1001061&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Sep 24 20:37:44 2010
@@ -932,6 +932,7 @@ Release 0.21.0 - Unreleased
    HBASE-3017  More log pruning
    HBASE-3022  Change format of enum messages in o.a.h.h.executor package
    HBASE-3001  Ship dependency jars to the cluster for all jobs
+   HBASE-3033  [replication] ReplicationSink.replicateEntries improvements
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1001061&r1=1001060&r2=1001061&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 Fri Sep 24 20:37:44 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.replicat
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Delete;
@@ -37,6 +36,8 @@ import org.apache.hadoop.hbase.Stoppable
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -88,7 +89,7 @@ public class ReplicationSink {
    * @param entries
    * @throws IOException
    */
-  public synchronized void replicateEntries(HLog.Entry[] entries)
+  public void replicateEntries(HLog.Entry[] entries)
       throws IOException {
     if (entries.length == 0) {
       return;
@@ -97,8 +98,9 @@ public class ReplicationSink {
     // to the same table.
     try {
       long totalReplicated = 0;
-      byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
-      List<Put> puts = new ArrayList<Put>();
+      // Map of table => list of puts, we only want to flushCommits once per
+      // invocation of this method per table.
+      Map<byte[], List<Put>> puts = new TreeMap<byte[], 
List<Put>>(Bytes.BYTES_COMPARATOR);
       for (HLog.Entry entry : entries) {
         WALEdit edit = entry.getEdit();
         List<KeyValue> kvs = edit.getKeyValues();
@@ -115,9 +117,11 @@ public class ReplicationSink {
           }
           delete(entry.getKey().getTablename(), delete);
         } else {
-          // Switching table, flush
-          if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
-            put(lastTable, puts);
+          byte[] table = entry.getKey().getTablename();
+          List<Put> tableList = puts.get(table);
+          if (tableList == null) {
+            tableList = new ArrayList<Put>();
+            puts.put(table, tableList);
           }
           // With mini-batching, we need to expect multiple rows per edit
           byte[] lastKey = kvs.get(0).getRow();
@@ -125,18 +129,19 @@ public class ReplicationSink {
               kvs.get(0).getTimestamp());
           for (KeyValue kv : kvs) {
             if (!Bytes.equals(lastKey, kv.getRow())) {
-              puts.add(put);
+              tableList.add(put);
               put = new Put(kv.getRow(), kv.getTimestamp());
             }
             put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
             lastKey = kv.getRow();
           }
-          puts.add(put);
-          lastTable = entry.getKey().getTablename();
+          tableList.add(put);
         }
         totalReplicated++;
       }
-      put(lastTable, puts);
+      for(byte [] table : puts.keySet()) {
+        put(table, puts.get(table));
+      }
       this.metrics.setAgeOfLastAppliedOp(
           entries[entries.length-1].getKey().getWriteTime());
       this.metrics.appliedBatchesRate.inc(1);
@@ -175,8 +180,6 @@ public class ReplicationSink {
       table = this.pool.getTable(tableName);
       table.put(puts);
       this.metrics.appliedOpsRate.inc(puts.size());
-      this.pool.putTable(table);
-      puts.clear();
     } finally {
       if (table != null) {
         this.pool.putTable(table);
@@ -196,7 +199,6 @@ public class ReplicationSink {
       table = this.pool.getTable(tableName);
       table.delete(delete);
       this.metrics.appliedOpsRate.inc(1);
-      this.pool.putTable(table);
     } finally {
       if (table != null) {
         this.pool.putTable(table);


Reply via email to