HBASE-18116 fix replication source in-memory calculation by excluding bulk load file
Signed-off-by: Andrew Purtell <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6f3f3422 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6f3f3422 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6f3f3422 Branch: refs/heads/HBASE-20331 Commit: 6f3f34227e83dd1811a06066d1d2bb913299a297 Parents: d909ec5 Author: Xu Cang <[email protected]> Authored: Fri May 25 16:46:42 2018 -0700 Committer: Andrew Purtell <[email protected]> Committed: Thu May 31 14:22:12 2018 -0700 ---------------------------------------------------------------------- .../ReplicationSourceWALReader.java | 22 +++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6f3f3422/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 64fd48d..ee54931 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -168,10 +169,12 @@ class ReplicationSourceWALReader extends Thread { if (edit == null || edit.isEmpty()) { return false; } - long entrySize = getEntrySize(entry); + long entrySize = getEntrySizeIncludeBulkLoad(entry); + long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry); updateBatchStats(batch, entry, entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySize); + boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); + // Stop if too many entries or too big return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || batch.getNbEntries() >= replicationBatchCountCapacity; @@ -296,11 +299,20 @@ class ReplicationSourceWALReader extends Thread { return entryBatchQueue.take(); } - private long getEntrySize(Entry entry) { + private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); - return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); + WALKey key = entry.getKey(); + return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) + + key.estimatedSerializedSizeOf(); } + private long getEntrySizeExcludeBulkLoad(Entry entry) { + WALEdit edit = entry.getEdit(); + WALKey key = entry.getKey(); + return edit.heapSize() + key.estimatedSerializedSizeOf(); + } + + private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { WALEdit edit = entry.getEdit(); batch.incrementHeapSize(entrySize); @@ -353,7 +365,7 @@ class ReplicationSourceWALReader extends Thread { * @param edit edit to count row keys from * @return the total size of the store files */ - private int calculateTotalSizeOfStoreFiles(WALEdit edit) { + private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { List<Cell> cells = edit.getCells(); int totalStoreFilesSize = 0;
