This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 303db63  HBASE-24779 Report on the WAL edit buffer usage/limit for 
replication
303db63 is described below

commit 303db63b7654db499923ac80c9be46e08a25b27b
Author: Josh Elser <els...@apache.org>
AuthorDate: Fri Aug 7 12:59:17 2020 -0400

    HBASE-24779 Report on the WAL edit buffer usage/limit for replication
    
    Closes #2193
    
    Signed-off-by: Bharath Vissapragada <bhara...@apache.org>
    Signed-off-by: Sean Busbey <bus...@apache.org>
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 ...a => MetricsReplicationGlobalSourceSource.java} | 24 ++++++++----
 .../MetricsReplicationSourceFactory.java           |  2 +-
 ... MetricsReplicationGlobalSourceSourceImpl.java} | 20 ++++++++--
 .../MetricsReplicationSourceFactoryImpl.java       |  4 +-
 .../MetricsReplicationSourceSourceImpl.java        |  2 +-
 .../replication/regionserver/MetricsSource.java    | 19 ++++++++-
 .../replication/regionserver/Replication.java      |  7 +++-
 .../regionserver/ReplicationSource.java            |  4 +-
 .../regionserver/ReplicationSourceManager.java     | 25 +++++++++++-
 .../regionserver/ReplicationSourceWALReader.java   | 11 ++++--
 .../hbase/replication/TestReplicationEndpoint.java | 45 ++++++++++++++++++++--
 .../regionserver/TestWALEntryStream.java           |  5 +++
 12 files changed, 142 insertions(+), 26 deletions(-)

diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
similarity index 56%
copy from 
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
copy to 
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 2816f83..e373a6c 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,15 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public interface MetricsReplicationSourceFactory {
-  public MetricsReplicationSinkSource getSink();
-  public MetricsReplicationSourceSource getSource(String id);
-  public MetricsReplicationTableSource getTableSource(String tableName);
-  public MetricsReplicationSourceSource getGlobalSource();
+public interface MetricsReplicationGlobalSourceSource extends 
MetricsReplicationSourceSource {
+
+  public static final String SOURCE_WAL_READER_EDITS_BUFFER = 
"source.walReaderEditsBufferUsage";
+
+  /**
+   * Sets the total usage of memory used by edits in memory read from WALs. 
The memory represented
+   * by this usage measure is across peers/sources. For example, we may batch 
the same WAL edits
+   * multiple times for the sake of replicating them to multiple peers..
+   * @param usage The memory used by edits in bytes
+   */
+  void setWALReaderEditsBufferBytes(long usage);
+
+  /**
+   * Returns the size, in bytes, of edits held in memory to be replicated 
across all peers.
+   */
+  long getWALReaderEditsBufferBytes();
 }
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 2816f83..5e4ad27 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory {
   public MetricsReplicationSinkSource getSink();
   public MetricsReplicationSourceSource getSource(String id);
   public MetricsReplicationTableSource getTableSource(String tableName);
-  public MetricsReplicationSourceSource getGlobalSource();
+  public MetricsReplicationGlobalSourceSource getGlobalSource();
 }
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
similarity index 93%
rename from 
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
rename to 
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 8942182..52aa1b0 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationSourceSource{
+public class MetricsReplicationGlobalSourceSourceImpl
+    implements MetricsReplicationGlobalSourceSource {
   private static final String KEY_PREFIX = "source.";
 
   private final MetricsReplicationSourceImpl rms;
@@ -53,8 +54,9 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
   private final MutableFastCounter failedRecoveryQueue;
+  private final MutableGaugeLong walReaderBufferUsageBytes;
 
-  public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl 
rms) {
+  public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl 
rms) {
     this.rms = rms;
 
     ageOfLastShippedOpHist = 
rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@@ -92,6 +94,9 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
             .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
     failedRecoveryQueue = rms.getMetricsRegistry()
             .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
+
+    walReaderBufferUsageBytes = rms.getMetricsRegistry()
+        .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -142,7 +147,6 @@ public class MetricsReplicationGlobalSourceSource 
implements MetricsReplicationS
       }
     }
   }
-
   @Override public void incrLogReadInBytes(long size) {
     logReadInBytesCounter.incr(size);
   }
@@ -275,4 +279,14 @@ public class MetricsReplicationGlobalSourceSource 
implements MetricsReplicationS
   public long getEditsFiltered() {
     return this.walEditsFilteredCounter.value();
   }
+
+  @Override
+  public void setWALReaderEditsBufferBytes(long usage) {
+    this.walReaderBufferUsageBytes.set(usage);
+  }
+
+  @Override
+  public long getWALReaderEditsBufferBytes() {
+    return this.walReaderBufferUsageBytes.value();
+  }
 }
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index a3b3462..c0cd1c7 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -39,7 +39,7 @@ public class MetricsReplicationSourceFactoryImpl implements 
MetricsReplicationSo
     return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, 
tableName);
   }
 
-  @Override public MetricsReplicationSourceSource getGlobalSource() {
-    return new 
MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
+  @Override public MetricsReplicationGlobalSourceSource getGlobalSource() {
+    return new 
MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
   }
 }
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index ec9271e..2af3ac9 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -162,7 +162,7 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
 
   @Override public void incrShippedBytes(long size) {
     shippedBytesCounter.incr(size);
-    MetricsReplicationGlobalSourceSource
+    MetricsReplicationGlobalSourceSourceImpl
       .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index e7a8f36..3fa3253 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -51,7 +51,7 @@ public class MetricsSource implements BaseSource {
   private long timeStampNextToReplicate;
 
   private final MetricsReplicationSourceSource singleSourceSource;
-  private final MetricsReplicationSourceSource globalSourceSource;
+  private final MetricsReplicationGlobalSourceSource globalSourceSource;
   private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
 
   /**
@@ -75,7 +75,7 @@ public class MetricsSource implements BaseSource {
    * @param globalSourceSource Class to monitor global-scoped metrics
    */
   public MetricsSource(String id, MetricsReplicationSourceSource 
singleSourceSource,
-                       MetricsReplicationSourceSource globalSourceSource,
+                       MetricsReplicationGlobalSourceSource globalSourceSource,
                        Map<String, MetricsReplicationTableSource> 
singleSourceSourceByTable) {
     this.id = id;
     this.singleSourceSource = singleSourceSource;
@@ -465,4 +465,19 @@ public class MetricsSource implements BaseSource {
   public Map<String, MetricsReplicationTableSource> 
getSingleSourceSourceByTable() {
     return singleSourceSourceByTable;
   }
+
+  /**
+   * Sets the amount of memory in bytes used in this RegionServer by edits 
pending replication.
+   */
+  public void setWALReaderEditsBufferUsage(long usageInBytes) {
+    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
+  }
+
+  /**
+   * Returns the amount of memory in bytes used in this RegionServer by edits 
pending replication.
+   * @return
+   */
+  public long getWALReaderEditsBufferUsage() {
+    return globalSourceSource.getWALReaderEditsBufferBytes();
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index cb8e299..10ddd0c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
@@ -72,6 +73,7 @@ public class Replication implements ReplicationSourceService, 
ReplicationSinkSer
   private int statsThreadPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;
+  private MetricsReplicationGlobalSourceSource globalMetricsSource;
 
   private PeerProcedureHandler peerProcedureHandler;
 
@@ -119,9 +121,12 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
     } catch (KeeperException ke) {
       throw new IOException("Could not read cluster id", ke);
     }
+    this.globalMetricsSource = CompatibilitySingletonFactory
+        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
     this.replicationManager = new ReplicationSourceManager(queueStorage, 
replicationPeers, replicationTracker, conf,
       this.server, fs, logDir, oldLogDir, clusterId,
-        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> 
OptionalLong.empty());
+        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> 
OptionalLong.empty(),
+        globalMetricsSource);
     if (walProvider != null) {
       walProvider
         .addWALActionsListener(new ReplicationSourceWALActionListener(conf, 
replicationManager));
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 23b7029..c495376 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -775,7 +775,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    totalBufferUsed.addAndGet(-batchSize);
+    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+    // Record the new buffer usage
+    
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0858eb1..d69d8ea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -155,6 +155,9 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
 
   private AtomicLong totalBufferUsed = new AtomicLong();
+  // Total buffer size on this RegionServer for holding batched edits to be 
shipped.
+  private final long totalBufferLimit;
+  private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
@@ -171,7 +174,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker 
replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID 
clusterId,
-      WALFileLengthProvider walFileLengthProvider) throws IOException {
+      WALFileLengthProvider walFileLengthProvider,
+      MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
     // CopyOnWriteArrayList is thread-safe.
     // Generally, reading is more than modifying.
     this.sources = new ConcurrentHashMap<>();
@@ -205,6 +209,9 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     this.latestPaths = new HashSet<Path>();
     replicationForBulkLoadDataEnabled = 
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+    this.totalBufferLimit = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+        HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.globalMetrics = globalMetrics;
   }
 
   /**
@@ -880,6 +887,14 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
+   * Returns the maximum size in bytes of edits held in memory which are 
pending replication
+   * across all sources inside this RegionServer.
+   */
+  public long getTotalBufferLimit() {
+    return totalBufferLimit;
+  }
+
+  /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
    */
@@ -916,6 +931,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   public String getStats() {
     StringBuilder stats = new StringBuilder();
+    // Print stats that apply across all Replication Sources
+    stats.append("Global stats: ");
+    stats.append("WAL Edits Buffer 
Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
+        .append(getTotalBufferLimit()).append("B\n");
     for (ReplicationSourceInterface source : this.sources.values()) {
       stats.append("Normal source for cluster " + source.getPeerId() + ": ");
       stats.append(source.getStats() + "\n");
@@ -941,4 +960,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   int activeFailoverTaskCount() {
     return executor.getActiveCount();
   }
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+    return this.globalMetrics;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 70f02fa..dd5b4dc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -104,8 +103,7 @@ class ReplicationSourceWALReader extends Thread {
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
     this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
-    this.totalBufferQuota = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
-      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 
second
     this.maxRetriesMultiplier =
@@ -275,6 +273,8 @@ class ReplicationSourceWALReader extends Thread {
   private boolean checkQuota() {
     // try not to go over total quota
     if (totalBufferUsed.get() > totalBufferQuota) {
+      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B 
exceeds limit {}B",
+          this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -403,7 +403,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+    long newBufferUsed = totalBufferUsed.addAndGet(size);
+    // Record the new buffer usage
+    
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= totalBufferQuota;
   }
 
   /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 6fb24e0..b72975f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
 import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
 import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
 import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
@@ -339,9 +340,9 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
 
     MetricsReplicationSourceSource singleSourceSource =
       new MetricsReplicationSourceSourceImpl(singleRms, id);
-    MetricsReplicationSourceSource globalSourceSource =
-      new MetricsReplicationGlobalSourceSource(globalRms);
-    MetricsReplicationSourceSource spyglobalSourceSource = 
spy(globalSourceSource);
+    MetricsReplicationGlobalSourceSource globalSourceSource =
+      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
+    MetricsReplicationGlobalSourceSource spyglobalSourceSource = 
spy(globalSourceSource);
     doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
 
     Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
@@ -507,6 +508,44 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
     }
   }
 
+  /**
+   * Not used by unit tests, helpful for manual testing with replication.
+   * <p>
+   * Snippet for `hbase shell`:
+   * <pre>
+   * create 't', 'f'
+   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 
'org.apache.hadoop.hbase.replication.' + \
+   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
+   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
+   * </pre>
+   */
+  public static class SleepingReplicationEndpointForTest extends 
ReplicationEndpointForTest {
+    private long duration;
+    public SleepingReplicationEndpointForTest() {
+      super();
+    }
+
+    @Override
+    public void init(Context context) throws IOException {
+      super.init(context);
+      if (this.ctx != null) {
+        duration = this.ctx.getConfiguration().getLong(
+            "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
+      }
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext context) {
+      try {
+        Thread.sleep(duration);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      return super.replicate(context);
+    }
+  }
+
   public static class InterClusterReplicationEndpointForTest
       extends HBaseInterClusterReplicationEndpoint {
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 2a21660..63e7a8b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -371,6 +371,8 @@ public class TestWALEntryStream {
   private ReplicationSource mockReplicationSource(boolean recovered, 
Configuration conf) {
     ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
+        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     Server mockServer = Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceManager()).thenReturn(mockSourceManager);
@@ -378,6 +380,9 @@ public class TestWALEntryStream {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
+    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
+        MetricsReplicationGlobalSourceSource.class);
+    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
     return source;
   }
 

Reply via email to