This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 124af63  HBASE-24779 Report on the WAL edit buffer usage/limit for 
replication
124af63 is described below

commit 124af6392cdebff2fe2693c572a9564dc318eee5
Author: Josh Elser <els...@apache.org>
AuthorDate: Fri Aug 7 12:59:17 2020 -0400

    HBASE-24779 Report on the WAL edit buffer usage/limit for replication
    
    Closes #2193
    
    Signed-off-by: Bharath Vissapragada <bhara...@apache.org>
    Signed-off-by: Sean Busbey <bus...@apache.org>
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../MetricsReplicationGlobalSourceSource.java      | 248 ++-------------------
 ... MetricsReplicationGlobalSourceSourceImpl.java} |  19 +-
 .../MetricsReplicationSourceFactory.java           |   2 +-
 .../MetricsReplicationSourceFactoryImpl.java       |   4 +-
 .../replication/regionserver/MetricsSource.java    |  19 +-
 .../replication/regionserver/Replication.java      |   6 +-
 .../regionserver/ReplicationSource.java            |   4 +-
 .../regionserver/ReplicationSourceManager.java     |  25 ++-
 .../regionserver/ReplicationSourceWALReader.java   |  11 +-
 .../hbase/replication/TestReplicationEndpoint.java |  45 +++-
 .../regionserver/TestWALEntryStream.java           |   5 +
 11 files changed, 140 insertions(+), 248 deletions(-)

diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 630fdb8..e373a6c 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,239 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import org.apache.hadoop.metrics2.lib.MutableFastCounter;
-import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import org.apache.hadoop.metrics2.lib.MutableHistogram;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationSourceSource{
-  private static final String KEY_PREFIX = "source.";
-
-  private final MetricsReplicationSourceImpl rms;
-
-  private final MutableHistogram ageOfLastShippedOpHist;
-  private final MutableGaugeLong sizeOfLogQueueGauge;
-  private final MutableFastCounter logReadInEditsCounter;
-  private final MutableFastCounter walEditsFilteredCounter;
-  private final MutableFastCounter shippedBatchesCounter;
-  private final MutableFastCounter shippedOpsCounter;
-  private final MutableFastCounter shippedBytesCounter;
-  private final MutableFastCounter logReadInBytesCounter;
-  private final MutableFastCounter shippedHFilesCounter;
-  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
-  private final MutableFastCounter unknownFileLengthForClosedWAL;
-  private final MutableFastCounter uncleanlyClosedWAL;
-  private final MutableFastCounter uncleanlyClosedSkippedBytes;
-  private final MutableFastCounter restartWALReading;
-  private final MutableFastCounter repeatedFileBytes;
-  private final MutableFastCounter completedWAL;
-  private final MutableFastCounter completedRecoveryQueue;
-  private final MutableFastCounter failedRecoveryQueue;
-
-  public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl 
rms) {
-    this.rms = rms;
-
-    ageOfLastShippedOpHist = 
rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
-
-    sizeOfLogQueueGauge = 
rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
-
-    shippedBatchesCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
-
-    shippedOpsCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
-
-    shippedBytesCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
-
-    logReadInBytesCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
-
-    logReadInEditsCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
-
-    walEditsFilteredCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
-
-    shippedHFilesCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
-
-    sizeOfHFileRefsQueueGauge =
-        rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
-
-    unknownFileLengthForClosedWAL = rms.getMetricsRegistry()
-            .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
-    uncleanlyClosedWAL = 
rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
-    uncleanlyClosedSkippedBytes = rms.getMetricsRegistry()
-            .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
-    restartWALReading = 
rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
-    repeatedFileBytes = 
rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
-    completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 
0L);
-    completedRecoveryQueue = rms.getMetricsRegistry()
-            .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
-    failedRecoveryQueue = rms.getMetricsRegistry()
-            .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
-  }
-
-  @Override public void setLastShippedAge(long age) {
-    ageOfLastShippedOpHist.add(age);
-  }
-
-  @Override public void incrSizeOfLogQueue(int size) {
-    sizeOfLogQueueGauge.incr(size);
-  }
-
-  @Override public void decrSizeOfLogQueue(int size) {
-    sizeOfLogQueueGauge.decr(size);
-  }
-
-  @Override public void incrLogReadInEdits(long size) {
-    logReadInEditsCounter.incr(size);
-  }
-
-  @Override public void incrLogEditsFiltered(long size) {
-    walEditsFilteredCounter.incr(size);
-  }
-
-  @Override public void incrBatchesShipped(int batches) {
-    shippedBatchesCounter.incr(batches);
-  }
-
-  @Override public void incrOpsShipped(long ops) {
-    shippedOpsCounter.incr(ops);
-  }
-
-  @Override public void incrShippedBytes(long size) {
-    shippedBytesCounter.incr(size);
-  }
-
-  @Override public void incrLogReadInBytes(long size) {
-    logReadInBytesCounter.incr(size);
-  }
-
-  @Override public void clear() {
-  }
-
-  @Override
-  public long getLastShippedAge() {
-    return ageOfLastShippedOpHist.getMax();
-  }
-
-  @Override public void incrHFilesShipped(long hfiles) {
-    shippedHFilesCounter.incr(hfiles);
-  }
-
-  @Override
-  public void incrSizeOfHFileRefsQueue(long size) {
-    sizeOfHFileRefsQueueGauge.incr(size);
-  }
-
-  @Override
-  public void decrSizeOfHFileRefsQueue(long size) {
-    sizeOfHFileRefsQueueGauge.decr(size);
-  }
-
-  @Override
-  public int getSizeOfLogQueue() {
-    return (int)sizeOfLogQueueGauge.value();
-  }
-
-  @Override
-  public void incrUnknownFileLengthForClosedWAL() {
-    unknownFileLengthForClosedWAL.incr(1L);
-  }
-  @Override
-  public void incrUncleanlyClosedWALs() {
-    uncleanlyClosedWAL.incr(1L);
-  }
-  @Override
-  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
-    uncleanlyClosedSkippedBytes.incr(bytes);
-  }
-  @Override
-  public void incrRestartedWALReading() {
-    restartWALReading.incr(1L);
-  }
-  @Override
-  public void incrRepeatedFileBytes(final long bytes) {
-    repeatedFileBytes.incr(bytes);
-  }
-  @Override
-  public void incrCompletedWAL() {
-    completedWAL.incr(1L);
-  }
-  @Override
-  public void incrCompletedRecoveryQueue() {
-    completedRecoveryQueue.incr(1L);
-  }
-  @Override
-  public void incrFailedRecoveryQueue() {
-    failedRecoveryQueue.incr(1L);
-  }
-  @Override
-  public void init() {
-    rms.init();
-  }
-
-  @Override
-  public void setGauge(String gaugeName, long value) {
-    rms.setGauge(KEY_PREFIX + gaugeName, value);
-  }
-
-  @Override
-  public void incGauge(String gaugeName, long delta) {
-    rms.incGauge(KEY_PREFIX + gaugeName, delta);
-  }
-
-  @Override
-  public void decGauge(String gaugeName, long delta) {
-    rms.decGauge(KEY_PREFIX + gaugeName, delta);
-  }
-
-  @Override
-  public void removeMetric(String key) {
-    rms.removeMetric(KEY_PREFIX + key);
-  }
-
-  @Override
-  public void incCounters(String counterName, long delta) {
-    rms.incCounters(KEY_PREFIX + counterName, delta);
-  }
-
-  @Override
-  public void updateHistogram(String name, long value) {
-    rms.updateHistogram(KEY_PREFIX + name, value);
-  }
-
-  @Override
-  public String getMetricsContext() {
-    return rms.getMetricsContext();
-  }
-
-  @Override
-  public String getMetricsDescription() {
-    return rms.getMetricsDescription();
-  }
-
-  @Override
-  public String getMetricsJmxContext() {
-    return rms.getMetricsJmxContext();
-  }
-
-  @Override
-  public String getMetricsName() {
-    return rms.getMetricsName();
-  }
-
-  @Override
-  public long getWALEditsRead() {
-    return this.logReadInEditsCounter.value();
-  }
-
-  @Override
-  public long getShippedOps() {
-    return this.shippedOpsCounter.value();
-  }
-
-  @Override
-  public long getEditsFiltered() {
-    return this.walEditsFilteredCounter.value();
-  }
+public interface MetricsReplicationGlobalSourceSource extends 
MetricsReplicationSourceSource {
+
+  public static final String SOURCE_WAL_READER_EDITS_BUFFER = 
"source.walReaderEditsBufferUsage";
+
+  /**
+   * Sets the total usage of memory used by edits in memory read from WALs. 
The memory represented
+   * by this usage measure is across peers/sources. For example, we may batch 
the same WAL edits
+   * multiple times for the sake of replicating them to multiple peers..
+   * @param usage The memory used by edits in bytes
+   */
+  void setWALReaderEditsBufferBytes(long usage);
+
+  /**
+   * Returns the size, in bytes, of edits held in memory to be replicated 
across all peers.
+   */
+  long getWALReaderEditsBufferBytes();
 }
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
similarity index 92%
copy from 
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
copy to 
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 630fdb8..1c04109 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationSourceSource{
+public class MetricsReplicationGlobalSourceSourceImpl
+    implements MetricsReplicationGlobalSourceSource {
   private static final String KEY_PREFIX = "source.";
 
   private final MetricsReplicationSourceImpl rms;
@@ -47,8 +48,9 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
   private final MutableFastCounter failedRecoveryQueue;
+  private final MutableGaugeLong walReaderBufferUsageBytes;
 
-  public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl 
rms) {
+  public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl 
rms) {
     this.rms = rms;
 
     ageOfLastShippedOpHist = 
rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@@ -84,6 +86,9 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
             .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
     failedRecoveryQueue = rms.getMetricsRegistry()
             .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
+
+    walReaderBufferUsageBytes = rms.getMetricsRegistry()
+        .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -250,4 +255,14 @@ public class MetricsReplicationGlobalSourceSource 
implements MetricsReplicationS
   public long getEditsFiltered() {
     return this.walEditsFilteredCounter.value();
   }
+
+  @Override
+  public void setWALReaderEditsBufferBytes(long usage) {
+    this.walReaderBufferUsageBytes.set(usage);
+  }
+
+  @Override
+  public long getWALReaderEditsBufferBytes() {
+    return this.walReaderBufferUsageBytes.value();
+  }
 }
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 2816f83..73d2cfd 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory {
   public MetricsReplicationSinkSource getSink();
   public MetricsReplicationSourceSource getSource(String id);
   public MetricsReplicationTableSource getTableSource(String tableName);
-  public MetricsReplicationSourceSource getGlobalSource();
+  public MetricsReplicationGlobalSourceSourceImpl getGlobalSource();
 }
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index a3b3462..061fc58 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -39,7 +39,7 @@ public class MetricsReplicationSourceFactoryImpl implements 
MetricsReplicationSo
     return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, 
tableName);
   }
 
-  @Override public MetricsReplicationSourceSource getGlobalSource() {
-    return new 
MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
+  @Override public MetricsReplicationGlobalSourceSourceImpl getGlobalSource() {
+    return new 
MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 39fe7b4..0f73576 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -51,7 +51,7 @@ public class MetricsSource implements BaseSource {
   private long timeStampNextToReplicate;
 
   private final MetricsReplicationSourceSource singleSourceSource;
-  private final MetricsReplicationSourceSource globalSourceSource;
+  private final MetricsReplicationGlobalSourceSource globalSourceSource;
   private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
 
   /**
@@ -75,7 +75,7 @@ public class MetricsSource implements BaseSource {
    * @param globalSourceSource Class to monitor global-scoped metrics
    */
   public MetricsSource(String id, MetricsReplicationSourceSource 
singleSourceSource,
-                       MetricsReplicationSourceSource globalSourceSource,
+                       MetricsReplicationGlobalSourceSource globalSourceSource,
                        Map<String, MetricsReplicationTableSource> 
singleSourceSourceByTable) {
     this.id = id;
     this.singleSourceSource = singleSourceSource;
@@ -454,4 +454,19 @@ public class MetricsSource implements BaseSource {
   public Map<String, MetricsReplicationTableSource> 
getSingleSourceSourceByTable() {
     return singleSourceSourceByTable;
   }
+
+  /**
+   * Sets the amount of memory in bytes used in this RegionServer by edits 
pending replication.
+   */
+  public void setWALReaderEditsBufferUsage(long usageInBytes) {
+    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
+  }
+
+  /**
+   * Returns the amount of memory in bytes used in this RegionServer by edits 
pending replication.
+   * @return
+   */
+  public long getWALReaderEditsBufferUsage() {
+    return globalSourceSource.getWALReaderEditsBufferBytes();
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 4cbce8c..195877b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
@@ -76,6 +77,7 @@ public class Replication implements ReplicationSourceService, 
ReplicationSinkSer
   private int statsThreadPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;
+  private MetricsReplicationGlobalSourceSource globalMetricsSource;
 
   private PeerProcedureHandler peerProcedureHandler;
 
@@ -124,10 +126,12 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
       throw new IOException("Could not read cluster id", ke);
     }
     SyncReplicationPeerMappingManager mapping = new 
SyncReplicationPeerMappingManager();
+    this.globalMetricsSource = CompatibilitySingletonFactory
+        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
     this.replicationManager = new ReplicationSourceManager(queueStorage, 
replicationPeers,
         replicationTracker, conf, this.server, fs, logDir, oldLogDir, 
clusterId,
         walProvider != null ? walProvider.getWALFileLengthProvider() : p -> 
OptionalLong.empty(),
-        mapping);
+        mapping, globalMetricsSource);
     this.syncReplicationPeerInfoProvider =
         new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
     PeerActionListener peerActionListener = PeerActionListener.DUMMY;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1d9269d..f24ecfa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -771,7 +771,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    totalBufferUsed.addAndGet(-batchSize);
+    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+    // Record the new buffer usage
+    
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 1a012bd..2cf91ed 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -169,6 +169,9 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   // Maximum number of retries before taking bold actions when deleting remote 
wal files for sync
   // replication peer.
   private final int maxRetriesMultiplier;
+  // Total buffer size on this RegionServer for holding batched edits to be 
shipped.
+  private final long totalBufferLimit;
+  private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
@@ -186,7 +189,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       ReplicationPeers replicationPeers, ReplicationTracker 
replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID 
clusterId,
       WALFileLengthProvider walFileLengthProvider,
-      SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) 
throws IOException {
+      SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
+      MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
     this.sources = new ConcurrentHashMap<>();
     this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
@@ -222,6 +226,9 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     this.sleepForRetries = 
this.conf.getLong("replication.source.sync.sleepforretries", 1000);
     this.maxRetriesMultiplier =
       this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
+    this.totalBufferLimit = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+        HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.globalMetrics = globalMetrics;
   }
 
   /**
@@ -1070,6 +1077,14 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
+   * Returns the maximum size in bytes of edits held in memory which are 
pending replication
+   * across all sources inside this RegionServer.
+   */
+  public long getTotalBufferLimit() {
+    return totalBufferLimit;
+  }
+
+  /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
    */
@@ -1106,6 +1121,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   public String getStats() {
     StringBuilder stats = new StringBuilder();
+    // Print stats that apply across all Replication Sources
+    stats.append("Global stats: ");
+    stats.append("WAL Edits Buffer 
Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
+        .append(getTotalBufferLimit()).append("B\n");
     for (ReplicationSourceInterface source : this.sources.values()) {
       stats.append("Normal source for cluster " + source.getPeerId() + ": ");
       stats.append(source.getStats() + "\n");
@@ -1131,4 +1150,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   int activeFailoverTaskCount() {
     return executor.getActiveCount();
   }
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+    return this.globalMetrics;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 7e0e550..c71db1b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -104,8 +103,7 @@ class ReplicationSourceWALReader extends Thread {
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
     this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
-    this.totalBufferQuota = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
-      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 
second
     this.maxRetriesMultiplier =
@@ -276,6 +274,8 @@ class ReplicationSourceWALReader extends Thread {
   private boolean checkQuota() {
     // try not to go over total quota
     if (totalBufferUsed.get() > totalBufferQuota) {
+      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B 
exceeds limit {}B",
+          this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -404,7 +404,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+    long newBufferUsed = totalBufferUsed.addAndGet(size);
+    // Record the new buffer usage
+    
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= totalBufferQuota;
   }
 
   /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 4dd264c..5a6ac0c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
 import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
 import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
 import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
@@ -329,9 +330,9 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
 
     MetricsReplicationSourceSource singleSourceSource =
       new MetricsReplicationSourceSourceImpl(singleRms, id);
-    MetricsReplicationSourceSource globalSourceSource =
-      new MetricsReplicationGlobalSourceSource(globalRms);
-    MetricsReplicationSourceSource spyglobalSourceSource = 
spy(globalSourceSource);
+    MetricsReplicationGlobalSourceSource globalSourceSource =
+      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
+    MetricsReplicationGlobalSourceSource spyglobalSourceSource = 
spy(globalSourceSource);
     doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
 
     Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
@@ -497,6 +498,44 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
     }
   }
 
+  /**
+   * Not used by unit tests, helpful for manual testing with replication.
+   * <p>
+   * Snippet for `hbase shell`:
+   * <pre>
+   * create 't', 'f'
+   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 
'org.apache.hadoop.hbase.replication.' + \
+   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
+   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
+   * </pre>
+   */
+  public static class SleepingReplicationEndpointForTest extends 
ReplicationEndpointForTest {
+    private long duration;
+    public SleepingReplicationEndpointForTest() {
+      super();
+    }
+
+    @Override
+    public void init(Context context) throws IOException {
+      super.init(context);
+      if (this.ctx != null) {
+        duration = this.ctx.getConfiguration().getLong(
+            "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
+      }
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext context) {
+      try {
+        Thread.sleep(duration);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      return super.replicate(context);
+    }
+  }
+
   public static class InterClusterReplicationEndpointForTest
       extends HBaseInterClusterReplicationEndpoint {
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 2a21660..63e7a8b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -371,6 +371,8 @@ public class TestWALEntryStream {
   private ReplicationSource mockReplicationSource(boolean recovered, 
Configuration conf) {
     ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
+        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     Server mockServer = Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceManager()).thenReturn(mockSourceManager);
@@ -378,6 +380,9 @@ public class TestWALEntryStream {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
+    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
+        MetricsReplicationGlobalSourceSource.class);
+    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
     return source;
   }
 

Reply via email to