HBASE-18116 Replication source in-memory accounting should not include bulk 
transfer hfiles

Signed-off-by: Andrew Purtell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a11701ec
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a11701ec
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a11701ec

Branch: refs/heads/HBASE-20331
Commit: a11701ecc5094d0c8cd96a290d86e52df83a9707
Parents: 0968668
Author: Xu Cang <[email protected]>
Authored: Thu May 31 20:00:04 2018 -0700
Committer: Andrew Purtell <[email protected]>
Committed: Fri Jun 1 11:15:47 2018 -0700

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  1 +
 .../regionserver/ReplicationSourceShipper.java  | 19 ++++++++++++++++-
 .../ReplicationSourceWALReader.java             | 22 +++++++++++++++-----
 .../regionserver/TestGlobalThrottler.java       | 20 ++++++++----------
 4 files changed, 45 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a11701ec/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4051efe..d21d83c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -598,6 +598,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   }
 
   @Override
+  //offsets totalBufferUsed by deducting shipped batchSize.
   public void postShipEdits(List<Entry> entries, int batchSize) {
     if (throttler.isEnabled()) {
       throttler.addPushSize(batchSize);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11701ec/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 11fd660..123ecbe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -124,6 +124,18 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   /**
+   * get batchEntry size excludes bulk load file sizes.
+   * Uses ReplicationSourceWALReader's static method.
+   */
+  private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
+    int totalSize = 0;
+    for(Entry entry : entryBatch.getWalEntries()) {
+      totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
+    }
+    return  totalSize;
+  }
+
+  /**
    * Do the shipping logic
    */
   private void shipEdits(WALEntryBatch entryBatch) {
@@ -139,6 +151,7 @@ public class ReplicationSourceShipper extends Thread {
       return;
     }
     int currentSize = (int) entryBatch.getHeapSize();
+    int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
     while (isActive()) {
       try {
         try {
@@ -175,7 +188,11 @@ public class ReplicationSourceShipper extends Thread {
         // Log and clean up WAL logs
         updateLogPosition(entryBatch);
 
-        source.postShipEdits(entries, currentSize);
+        //offsets totalBufferUsed by deducting shipped batchSize (excludes 
bulk load size)
+        //this sizeExcludeBulkLoad has to use same calculation that when 
calling
+        //acquireBufferQuota() in ReplicatinoSourceWALReader because they 
maintain
+        //same variable: totalBufferUsed
+        source.postShipEdits(entries, sizeExcludeBulkLoad);
         // FIXME check relationship between wal group and overall
         source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), 
currentSize,
           entryBatch.getNbHFiles());

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11701ec/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 64fd48d..f685a9b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -168,10 +169,12 @@ class ReplicationSourceWALReader extends Thread {
     if (edit == null || edit.isEmpty()) {
       return false;
     }
-    long entrySize = getEntrySize(entry);
+    long entrySize = getEntrySizeIncludeBulkLoad(entry);
+    long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
     batch.addEntry(entry);
     updateBatchStats(batch, entry, entrySize);
-    boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
+    boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+
     // Stop if too many entries or too big
     return totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity ||
       batch.getNbEntries() >= replicationBatchCountCapacity;
@@ -296,11 +299,20 @@ class ReplicationSourceWALReader extends Thread {
     return entryBatchQueue.take();
   }
 
-  private long getEntrySize(Entry entry) {
+  private long getEntrySizeIncludeBulkLoad(Entry entry) {
     WALEdit edit = entry.getEdit();
-    return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
+    WALKey key = entry.getKey();
+    return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
+        key.estimatedSerializedSizeOf();
   }
 
+  public static long getEntrySizeExcludeBulkLoad(Entry entry) {
+    WALEdit edit = entry.getEdit();
+    WALKey key = entry.getKey();
+    return edit.heapSize() + key.estimatedSerializedSizeOf();
+  }
+
+
   private void updateBatchStats(WALEntryBatch batch, Entry entry, long 
entrySize) {
     WALEdit edit = entry.getEdit();
     batch.incrementHeapSize(entrySize);
@@ -353,7 +365,7 @@ class ReplicationSourceWALReader extends Thread {
    * @param edit edit to count row keys from
    * @return the total size of the store files
    */
-  private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
+  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
     List<Cell> cells = edit.getCells();
     int totalStoreFilesSize = 0;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a11701ec/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
index d3b4e8e..fecce02 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
@@ -64,6 +64,8 @@ public class TestGlobalThrottler {
       HBaseClassTestRule.forClass(TestGlobalThrottler.class);
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestGlobalThrottler.class);
+  private static final int REPLICATION_SOURCE_QUOTA = 200;
+  private static int numOfPeer = 0;
   private static Configuration conf1;
   private static Configuration conf2;
 
@@ -84,7 +86,7 @@ public class TestGlobalThrottler {
     conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
     conf1.setLong("replication.source.sleepforretries", 100);
     // Each WAL is about 120 bytes
-    conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
+    conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 
REPLICATION_SOURCE_QUOTA);
     conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
 
     utility1 = new HBaseTestingUtility(conf1);
@@ -109,6 +111,7 @@ public class TestGlobalThrottler {
     admin1.addPeer("peer1", rpc, null);
     admin1.addPeer("peer2", rpc, null);
     admin1.addPeer("peer3", rpc, null);
+    numOfPeer = admin1.getPeersCount();
   }
 
   @AfterClass
@@ -140,7 +143,10 @@ public class TestGlobalThrottler {
         if (size > 0) {
           testQuotaNonZero = true;
         }
-        if (size > 600) {
+        //the reason here doing "numOfPeer + 1" is because by using method 
addEntryToBatch(), even the
+        // batch size (after added last entry) exceeds quota, it still keeps 
the last one in the batch
+        // so total used buffer size can be one 
"replication.total.buffer.quota" larger than expected
+        if (size > REPLICATION_SOURCE_QUOTA * (numOfPeer + 1)) {
           // We read logs first then check throttler, so if the buffer quota 
limiter doesn't
           // take effect, it will push many logs and exceed the quota.
           testQuotaPass = false;
@@ -181,13 +187,5 @@ public class TestGlobalThrottler {
     Assert.assertTrue(testQuotaNonZero);
   }
 
-  private List<Integer> getRowNumbers(List<Cell> cells) {
-    List<Integer> listOfRowNumbers = new ArrayList<>(cells.size());
-    for (Cell c : cells) {
-      listOfRowNumbers.add(Integer.parseInt(Bytes
-          .toString(c.getRowArray(), c.getRowOffset() + ROW.length,
-              c.getRowLength() - ROW.length)));
-    }
-    return listOfRowNumbers;
-  }
+
 }

Reply via email to