Author: jdcryans
Date: Fri Sep 24 20:37:44 2010
New Revision: 1001061
URL: http://svn.apache.org/viewvc?rev=1001061&view=rev
Log:
HBASE-3033 [replication] ReplicationSink.replicateEntries improvements
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1001061&r1=1001060&r2=1001061&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Sep 24 20:37:44 2010
@@ -932,6 +932,7 @@ Release 0.21.0 - Unreleased
HBASE-3017 More log pruning
HBASE-3022 Change format of enum messages in o.a.h.h.executor package
HBASE-3001 Ship dependency jars to the cluster for all jobs
+ HBASE-3033 [replication] ReplicationSink.replicateEntries improvements
NEW FEATURES
HBASE-1961 HBase EC2 scripts
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1001061&r1=1001060&r2=1001061&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
Fri Sep 24 20:37:44 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.replicat
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
@@ -37,6 +36,8 @@ import org.apache.hadoop.hbase.Stoppable
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -88,7 +89,7 @@ public class ReplicationSink {
* @param entries
* @throws IOException
*/
- public synchronized void replicateEntries(HLog.Entry[] entries)
+ public void replicateEntries(HLog.Entry[] entries)
throws IOException {
if (entries.length == 0) {
return;
@@ -97,8 +98,9 @@ public class ReplicationSink {
// to the same table.
try {
long totalReplicated = 0;
- byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
- List<Put> puts = new ArrayList<Put>();
+ // Map of table => list of puts, we only want to flushCommits once per
+ // invocation of this method per table.
+ Map<byte[], List<Put>> puts = new TreeMap<byte[],
List<Put>>(Bytes.BYTES_COMPARATOR);
for (HLog.Entry entry : entries) {
WALEdit edit = entry.getEdit();
List<KeyValue> kvs = edit.getKeyValues();
@@ -115,9 +117,11 @@ public class ReplicationSink {
}
delete(entry.getKey().getTablename(), delete);
} else {
- // Switching table, flush
- if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
- put(lastTable, puts);
+ byte[] table = entry.getKey().getTablename();
+ List<Put> tableList = puts.get(table);
+ if (tableList == null) {
+ tableList = new ArrayList<Put>();
+ puts.put(table, tableList);
}
// With mini-batching, we need to expect multiple rows per edit
byte[] lastKey = kvs.get(0).getRow();
@@ -125,18 +129,19 @@ public class ReplicationSink {
kvs.get(0).getTimestamp());
for (KeyValue kv : kvs) {
if (!Bytes.equals(lastKey, kv.getRow())) {
- puts.add(put);
+ tableList.add(put);
put = new Put(kv.getRow(), kv.getTimestamp());
}
put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
lastKey = kv.getRow();
}
- puts.add(put);
- lastTable = entry.getKey().getTablename();
+ tableList.add(put);
}
totalReplicated++;
}
- put(lastTable, puts);
+ for(byte [] table : puts.keySet()) {
+ put(table, puts.get(table));
+ }
this.metrics.setAgeOfLastAppliedOp(
entries[entries.length-1].getKey().getWriteTime());
this.metrics.appliedBatchesRate.inc(1);
@@ -175,8 +180,6 @@ public class ReplicationSink {
table = this.pool.getTable(tableName);
table.put(puts);
this.metrics.appliedOpsRate.inc(puts.size());
- this.pool.putTable(table);
- puts.clear();
} finally {
if (table != null) {
this.pool.putTable(table);
@@ -196,7 +199,6 @@ public class ReplicationSink {
table = this.pool.getTable(tableName);
table.delete(delete);
this.metrics.appliedOpsRate.inc(1);
- this.pool.putTable(table);
} finally {
if (table != null) {
this.pool.putTable(table);