Author: tedyu
Date: Mon Dec 30 20:35:00 2013
New Revision: 1554312
URL: http://svn.apache.org/r1554312
Log:
HBASE-10252 Don't write back to WAL/memstore when Increment amount is zero
Modified:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
Modified:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1554312&r1=1554311&r2=1554312&view=diff
==============================================================================
---
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Mon Dec 30 20:35:00 2013
@@ -5159,6 +5159,8 @@ public class HRegion implements HeapSize
int idx = 0;
for (Cell kv: family.getValue()) {
long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+ boolean noWriteBack = (amount == 0);
+
Cell c = null;
if (idx < results.size() &&
CellUtil.matchingQualifier(results.get(idx), kv)) {
c = results.get(idx);
@@ -5200,57 +5202,66 @@ public class HRegion implements HeapSize
newKV =
KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
RegionObserver.MutationType.INCREMENT, increment, c,
(Cell) newKV));
}
- kvs.add(newKV);
+ allKVs.add(newKV);
+
+ if (!noWriteBack) {
+ kvs.add(newKV);
- // Prepare WAL updates
- if (writeToWAL) {
- if (walEdits == null) {
- walEdits = new WALEdit();
+ // Prepare WAL updates
+ if (writeToWAL) {
+ if (walEdits == null) {
+ walEdits = new WALEdit();
+ }
+ walEdits.add(newKV);
}
- walEdits.add(newKV);
}
}
//store the kvs to the temporary memstore before writing HLog
- tempMemstore.put(store, kvs);
+ if (!kvs.isEmpty()) {
+ tempMemstore.put(store, kvs);
+ }
}
// Actually write to WAL now
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the
orginating
- // cluster. A slave cluster receives the final value (not the
delta)
- // as a Put.
- txid = this.log.appendNoSync(this.getRegionInfo(),
- this.htableDescriptor.getTableName(), walEdits, new
ArrayList<UUID>(),
- EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor, this.sequenceId,
- true, nonceGroup, nonce);
- } else {
- recordMutationWithoutWal(increment.getFamilyCellMap());
+ if (walEdits != null && !walEdits.isEmpty()) {
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the
orginating
+ // cluster. A slave cluster receives the final value (not the
delta)
+ // as a Put.
+ txid = this.log.appendNoSync(this.getRegionInfo(),
+ this.htableDescriptor.getTableName(), walEdits, new
ArrayList<UUID>(),
+ EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor, this.sequenceId,
+ true, nonceGroup, nonce);
+ } else {
+ recordMutationWithoutWal(increment.getFamilyCellMap());
+ }
}
//Actually write to Memstore now
- for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
- Store store = entry.getKey();
- if (store.getFamily().getMaxVersions() == 1) {
- // upsert if VERSIONS for this CF == 1
- size += store.upsert(entry.getValue(), getSmallestReadPoint());
- } else {
- // otherwise keep older versions around
- for (Cell cell : entry.getValue()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- size += store.add(kv);
+ if (!tempMemstore.isEmpty()) {
+ for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet())
{
+ Store store = entry.getKey();
+ if (store.getFamily().getMaxVersions() == 1) {
+ // upsert if VERSIONS for this CF == 1
+ size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ } else {
+ // otherwise keep older versions around
+ for (Cell cell : entry.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ size += store.add(kv);
+ }
}
}
- allKVs.addAll(entry.getValue());
+ size = this.addAndGetGlobalMemstoreSize(size);
+ flush = isFlushSize(size);
}
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
} finally {
this.updatesLock.readLock().unlock();
}
} finally {
rowLock.release();
}
- if (writeToWAL) {
+ if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
// sync the transaction log outside the rowlock
syncOrDefer(txid, durability);
}
Modified:
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java?rev=1554312&r1=1554311&r2=1554312&view=diff
==============================================================================
---
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
(original)
+++
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
Mon Dec 30 20:35:00 2013
@@ -32,7 +32,9 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -130,6 +132,62 @@ public class TestDurability {
verifyHLogCount(wal, 12);
}
+ @Test
+ public void testIncrement() throws Exception {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] col1 = Bytes.toBytes("col1");
+ byte[] col2 = Bytes.toBytes("col2");
+ byte[] col3 = Bytes.toBytes("col3");
+
+ // Setting up region
+ HLog wal = HLogFactory.createHLog(FS, DIR, "myhlogdir",
+ "myhlogdir_archive", CONF);
+ byte[] tableName = Bytes.toBytes("TestIncrement");
+ HRegion region = createHRegion(tableName, "increment", wal, false);
+
+ // col1: amount = 1, 1 write back to WAL
+ Increment inc1 = new Increment(row1);
+ inc1.addColumn(FAMILY, col1, 1);
+ Result res = region.increment(inc1);
+ assertEquals(1, res.size());
+ assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
+ verifyHLogCount(wal, 1);
+
+ // col1: amount = 0, 0 write back to WAL
+ inc1 = new Increment(row1);
+ inc1.addColumn(FAMILY, col1, 0);
+ res = region.increment(inc1);
+ assertEquals(1, res.size());
+ assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
+ verifyHLogCount(wal, 1);
+
+ // col1: amount = 0, col2: amount = 0, col3: amount = 0
+ // 0 write back to WAL
+ inc1 = new Increment(row1);
+ inc1.addColumn(FAMILY, col1, 0);
+ inc1.addColumn(FAMILY, col2, 0);
+ inc1.addColumn(FAMILY, col3, 0);
+ res = region.increment(inc1);
+ assertEquals(3, res.size());
+ assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
+ assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
+ assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
+ verifyHLogCount(wal, 1);
+
+ // col1: amount = 5, col2: amount = 4, col3: amount = 3
+ // 1 write back to WAL
+ inc1 = new Increment(row1);
+ inc1.addColumn(FAMILY, col1, 5);
+ inc1.addColumn(FAMILY, col2, 4);
+ inc1.addColumn(FAMILY, col3, 3);
+ res = region.increment(inc1);
+ assertEquals(3, res.size());
+ assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
+ assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
+ assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
+ verifyHLogCount(wal, 2);
+ }
+
private Put newPut(Durability durability) {
Put p = new Put(ROW);
p.add(FAMILY, COL, COL);