This is an automated email from the ASF dual-hosted git repository.

tianjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new eedc33d  Revert "HBASE-21505 - proposal for a more consistent report 
on status"
eedc33d is described below

commit eedc33dfd277ce6b86a0ce626375e9f048c6a589
Author: Jingyun Tian <[email protected]>
AuthorDate: Wed Feb 20 18:06:42 2019 +0800

    Revert "HBASE-21505 - proposal for a more consistent report on status"
    
    This reverts commit c57802058829daa6ab8c65fc9c388535bf187283.
---
 .../java/org/apache/hadoop/hbase/ServerLoad.java   |   9 -
 .../org/apache/hadoop/hbase/ServerMetrics.java     |   6 -
 .../apache/hadoop/hbase/ServerMetricsBuilder.java  |  15 +-
 .../hbase/replication/ReplicationLoadSource.java   | 142 +-------------
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  23 +--
 .../MetricsReplicationSourceSource.java            |   3 -
 .../MetricsReplicationGlobalSourceSource.java      |  21 +--
 .../MetricsReplicationSourceSourceImpl.java        |  18 +-
 .../src/main/protobuf/ClusterStatus.proto          |   7 -
 .../hadoop/hbase/regionserver/HRegionServer.java   |   4 +-
 .../HBaseInterClusterReplicationEndpoint.java      |  12 +-
 .../replication/regionserver/MetricsSource.java    |  75 ++------
 .../replication/regionserver/Replication.java      |  21 ++-
 .../replication/regionserver/ReplicationLoad.java  |  85 ++++++---
 .../regionserver/ReplicationSource.java            |  37 +---
 .../regionserver/ReplicationSourceManager.java     |   5 -
 .../regionserver/ReplicationSourceShipper.java     |  23 ++-
 .../regionserver/ReplicationSourceWALReader.java   |   8 +-
 .../replication/regionserver/WALEntryStream.java   |   2 -
 .../hbase/master/TestGetReplicationLoad.java       |  48 +----
 .../hbase/replication/TestReplicationBase.java     |  27 +--
 .../hbase/replication/TestReplicationStatus.java   | 203 +--------------------
 hbase-shell/src/main/ruby/hbase/admin.rb           | 119 +++---------
 23 files changed, 188 insertions(+), 725 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
index f7f3204..dbf0070 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
@@ -402,15 +402,6 @@ public class ServerLoad implements ServerMetrics {
 
   /**
    * Call directly from client such as hbase shell
-   * @return a map of ReplicationLoadSource list per peer id
-   */
-  @Override
-  public Map<String, List<ReplicationLoadSource>> 
getReplicationLoadSourceMap() {
-    return metrics.getReplicationLoadSourceMap();
-  }
-
-  /**
-   * Call directly from client such as hbase shell
    * @return ReplicationLoadSink
    */
   @Override
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
index 391e62f..1e1d395 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
@@ -77,12 +77,6 @@ public interface ServerMetrics {
 
   /**
    * Call directly from client such as hbase shell
-   * @return a map of ReplicationLoadSource list per peer id
-   */
-  Map<String, List<ReplicationLoadSource>> getReplicationLoadSourceMap();
-
-  /**
-   * Call directly from client such as hbase shell
    * @return ReplicationLoadSink
    */
   @Nullable
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
index 5721660..333344b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
@@ -18,11 +18,8 @@
 package org.apache.hadoop.hbase;
 
 import edu.umd.cs.findbugs.annotations.Nullable;
-
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,7 +75,7 @@ public final class ServerMetricsBuilder {
       .setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
         
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
       .setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
-          
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
+        
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
       .setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
         ? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink())
         : null)
@@ -305,16 +302,6 @@ public final class ServerMetricsBuilder {
     }
 
     @Override
-    public Map<String, List<ReplicationLoadSource>> 
getReplicationLoadSourceMap(){
-      Map<String,List<ReplicationLoadSource>> sourcesMap = new HashMap<>();
-      for(ReplicationLoadSource loadSource : sources){
-        sourcesMap.computeIfAbsent(loadSource.getPeerID(),
-          peerId -> new ArrayList()).add(loadSource);
-      }
-      return sourcesMap;
-    }
-
-    @Override
     public ReplicationLoadSink getReplicationLoadSink() {
       return sink;
     }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java
index 8ee22195..9e24e22 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java
@@ -16,36 +16,21 @@ import org.apache.yetus.audience.InterfaceAudience;
  * A HBase ReplicationLoad to present MetricsSource information
  */
 @InterfaceAudience.Public
-public final class ReplicationLoadSource {
+public class ReplicationLoadSource {
   private final String peerID;
   private final long ageOfLastShippedOp;
   private final int sizeOfLogQueue;
   private final long timestampOfLastShippedOp;
   private final long replicationLag;
-  private long timeStampOfNextToReplicate;
-  private String queueId;
-  private boolean recovered;
-  private boolean running;
-  private boolean editsSinceRestart;
-  private long editsRead;
-  private long oPsShipped;
 
+  // TODO: add the builder for this class
   @InterfaceAudience.Private
-  private ReplicationLoadSource(String id, long age, int size, long timestamp,
-      long timeStampOfNextToReplicate, long lag, String queueId, boolean 
recovered, boolean running,
-      boolean editsSinceRestart, long editsRead, long oPsShipped) {
+  public ReplicationLoadSource(String id, long age, int size, long timestamp, 
long lag) {
     this.peerID = id;
     this.ageOfLastShippedOp = age;
     this.sizeOfLogQueue = size;
     this.timestampOfLastShippedOp = timestamp;
     this.replicationLag = lag;
-    this.timeStampOfNextToReplicate = timeStampOfNextToReplicate;
-    this.queueId = queueId;
-    this.recovered = recovered;
-    this.running = running;
-    this.editsSinceRestart = editsSinceRestart;
-    this.editsRead = editsRead;
-    this.oPsShipped = oPsShipped;
   }
 
   public String getPeerID() {
@@ -76,123 +61,4 @@ public final class ReplicationLoadSource {
   public long getReplicationLag() {
     return this.replicationLag;
   }
-
-  public long getTimeStampOfNextToReplicate() {
-    return this.timeStampOfNextToReplicate;
-  }
-
-  public String getQueueId() {
-    return queueId;
-  }
-
-  public boolean isRecovered() {
-    return recovered;
-  }
-
-  public boolean isRunning() {
-    return running;
-  }
-
-  public boolean hasEditsSinceRestart() {
-    return editsSinceRestart;
-  }
-
-  public long getEditsRead() {
-    return editsRead;
-  }
-
-  public long getOPsShipped() {
-    return oPsShipped;
-  }
-
-  public static ReplicationLoadSourceBuilder newBuilder(){
-    return new ReplicationLoadSourceBuilder();
-  }
-
-  public static final class ReplicationLoadSourceBuilder {
-
-    private String peerID;
-    private long ageOfLastShippedOp;
-    private int sizeOfLogQueue;
-    private long timestampOfLastShippedOp;
-    private long replicationLag;
-    private long timeStampOfNextToReplicate;
-    private String queueId;
-    private boolean recovered;
-    private boolean running;
-    private boolean editsSinceRestart;
-    private long editsRead;
-    private long oPsShipped;
-
-    private ReplicationLoadSourceBuilder(){
-
-    }
-
-    public ReplicationLoadSourceBuilder setTimeStampOfNextToReplicate(
-        long timeStampOfNextToReplicate) {
-      this.timeStampOfNextToReplicate = timeStampOfNextToReplicate;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setPeerID(String peerID) {
-      this.peerID = peerID;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setAgeOfLastShippedOp(long 
ageOfLastShippedOp) {
-      this.ageOfLastShippedOp = ageOfLastShippedOp;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setSizeOfLogQueue(int sizeOfLogQueue) {
-      this.sizeOfLogQueue = sizeOfLogQueue;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setTimestampOfLastShippedOp(long 
timestampOfLastShippedOp) {
-      this.timestampOfLastShippedOp = timestampOfLastShippedOp;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setReplicationLag(long replicationLag) 
{
-      this.replicationLag = replicationLag;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setQueueId(String queueId) {
-      this.queueId = queueId;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setRecovered(boolean recovered) {
-      this.recovered = recovered;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setRunning(boolean running) {
-      this.running = running;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setEditsSinceRestart(boolean 
editsSinceRestart) {
-      this.editsSinceRestart = editsSinceRestart;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setEditsRead(long editsRead) {
-      this.editsRead = editsRead;
-      return this;
-    }
-
-    public ReplicationLoadSourceBuilder setoPsShipped(long oPsShipped) {
-      this.oPsShipped = oPsShipped;
-      return this;
-    }
-
-    public ReplicationLoadSource build(){
-      return new ReplicationLoadSource(peerID, ageOfLastShippedOp, 
sizeOfLogQueue,
-          timestampOfLastShippedOp, timeStampOfNextToReplicate, 
replicationLag, queueId, recovered,
-          running, editsSinceRestart, editsRead, oPsShipped);
-    }
-  }
-}
\ No newline at end of file
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index a227a8f..fea81f1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2716,20 +2716,8 @@ public final class ProtobufUtil {
 
   public static ReplicationLoadSource toReplicationLoadSource(
       ClusterStatusProtos.ReplicationLoadSource rls) {
-    ReplicationLoadSource.ReplicationLoadSourceBuilder builder = 
ReplicationLoadSource.newBuilder();
-    builder.setPeerID(rls.getPeerID()).
-        setAgeOfLastShippedOp(rls.getAgeOfLastShippedOp()).
-        setSizeOfLogQueue(rls.getSizeOfLogQueue()).
-        setTimestampOfLastShippedOp(rls.getTimeStampOfLastShippedOp()).
-        setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate()).
-        setReplicationLag(rls.getReplicationLag()).
-        setQueueId(rls.getQueueId()).
-        setRecovered(rls.getRecovered()).
-        setRunning(rls.getRunning()).
-        setEditsSinceRestart(rls.getEditsSinceRestart()).
-        setEditsRead(rls.getEditsRead()).
-        setoPsShipped(rls.getOPsShipped());
-    return builder.build();
+    return new ReplicationLoadSource(rls.getPeerID(), 
rls.getAgeOfLastShippedOp(),
+      rls.getSizeOfLogQueue(), rls.getTimeStampOfLastShippedOp(), 
rls.getReplicationLag());
   }
 
   /**
@@ -3238,13 +3226,6 @@ public final class ProtobufUtil {
         .setSizeOfLogQueue((int) rls.getSizeOfLogQueue())
         .setTimeStampOfLastShippedOp(rls.getTimestampOfLastShippedOp())
         .setReplicationLag(rls.getReplicationLag())
-        .setQueueId(rls.getQueueId())
-        .setRecovered(rls.isRecovered())
-        .setRunning(rls.isRunning())
-        .setEditsSinceRestart(rls.hasEditsSinceRestart())
-        .setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate())
-        .setOPsShipped(rls.getOPsShipped())
-        .setEditsRead(rls.getEditsRead())
         .build();
   }
 
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 843a4c5..61e9431 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -78,7 +78,4 @@ public interface MetricsReplicationSourceSource extends 
BaseSource {
   void incrCompletedWAL();
   void incrCompletedRecoveryQueue();
   void incrFailedRecoveryQueue();
-  long getWALEditsRead();
-  long getShippedOps();
-  long getEditsFiltered();
 }
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 8942182..4e8c810 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -32,7 +32,7 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
   private final MutableHistogram ageOfLastShippedOpHist;
   private final MutableGaugeLong sizeOfLogQueueGauge;
   private final MutableFastCounter logReadInEditsCounter;
-  private final MutableFastCounter walEditsFilteredCounter;
+  private final MutableFastCounter logEditsFilteredCounter;
   private final MutableFastCounter shippedBatchesCounter;
   private final MutableFastCounter shippedOpsCounter;
   private final MutableFastCounter shippedBytesCounter;
@@ -73,7 +73,7 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
 
     logReadInEditsCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
 
-    walEditsFilteredCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
+    logEditsFilteredCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
 
     shippedHFilesCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
 
@@ -111,7 +111,7 @@ public class MetricsReplicationGlobalSourceSource 
implements MetricsReplicationS
   }
 
   @Override public void incrLogEditsFiltered(long size) {
-    walEditsFilteredCounter.incr(size);
+    logEditsFilteredCounter.incr(size);
   }
 
   @Override public void incrBatchesShipped(int batches) {
@@ -260,19 +260,4 @@ public class MetricsReplicationGlobalSourceSource 
implements MetricsReplicationS
   public String getMetricsName() {
     return rms.getMetricsName();
   }
-
-  @Override
-  public long getWALEditsRead() {
-    return this.logReadInEditsCounter.value();
-  }
-
-  @Override
-  public long getShippedOps() {
-    return this.shippedOpsCounter.value();
-  }
-
-  @Override
-  public long getEditsFiltered() {
-    return this.walEditsFilteredCounter.value();
-  }
 }
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index ec9271e..0ad5052 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -48,7 +48,7 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
   private final MutableHistogram ageOfLastShippedOpHist;
   private final MutableGaugeLong sizeOfLogQueueGauge;
   private final MutableFastCounter logReadInEditsCounter;
-  private final MutableFastCounter walEditsFilteredCounter;
+  private final MutableFastCounter logEditsFilteredCounter;
   private final MutableFastCounter shippedBatchesCounter;
   private final MutableFastCounter shippedOpsCounter;
   private final MutableFastCounter shippedKBsCounter;
@@ -102,7 +102,7 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
     logReadInEditsCounter = 
rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
 
     logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
-    walEditsFilteredCounter = 
rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
+    logEditsFilteredCounter = 
rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
 
     shippedHFilesKey = this.keyPrefix + "shippedHFiles";
     shippedHFilesCounter = 
rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
@@ -149,7 +149,7 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
   }
 
   @Override public void incrLogEditsFiltered(long size) {
-    walEditsFilteredCounter.incr(size);
+    logEditsFilteredCounter.incr(size);
   }
 
   @Override public void incrBatchesShipped(int batches) {
@@ -314,16 +314,4 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
   public String getMetricsName() {
     return rms.getMetricsName();
   }
-
-  @Override public long getWALEditsRead() {
-    return this.logReadInEditsCounter.value();
-  }
-
-  @Override public long getShippedOps() {
-    return this.shippedOpsCounter.value();
-  }
-
-  @Override public long getEditsFiltered() {
-    return this.walEditsFilteredCounter.value();
-  }
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto 
b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
index 3afbde3..d39db36 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
@@ -161,13 +161,6 @@ message ReplicationLoadSource {
   required uint32 sizeOfLogQueue = 3;
   required uint64 timeStampOfLastShippedOp = 4;
   required uint64 replicationLag = 5;
-  optional uint64 timeStampOfNextToReplicate=6;
-  optional string queueId = 7;
-  optional bool recovered = 8;
-  optional bool running = 9;
-  optional bool editsSinceRestart = 10;
-  optional uint64 editsRead = 11;
-  optional uint64 oPsShipped = 12;
 }
 
 message ServerLoad {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f983882..4eab37d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1395,11 +1395,9 @@ public class HRegionServer extends HasThread implements
       ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
       if (rLoad != null) {
         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
-        for (ClusterStatusProtos.ReplicationLoadSource rLS :
-            rLoad.getReplicationLoadSourceEntries()) {
+        for (ClusterStatusProtos.ReplicationLoadSource rLS : 
rLoad.getReplicationLoadSourceList()) {
           serverLoad.addReplLoadSource(rLS);
         }
-
       }
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 57301fc..7db53aa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -344,6 +344,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
     CompletionService<Integer> pool = new 
ExecutorCompletionService<>(this.exec);
+    String walGroupId = replicateContext.getWalGroupId();
     int sleepMultiplier = 1;
 
     if (!peersSelected && this.isRunning()) {
@@ -370,10 +371,19 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
         reconnectToPeerCluster();
       }
       try {
+        long lastWriteTime;
+
         // replicate the batches to sink side.
-        parallelReplicate(pool, replicateContext, batches);
+        lastWriteTime = parallelReplicate(pool, replicateContext, batches);
+
+        // update metrics
+        if (lastWriteTime > 0) {
+          this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
+        }
         return true;
       } catch (IOException ioe) {
+        // Didn't ship anything, but must still age the last time we did
+        this.metrics.refreshAgeOfLastShippedOp(walGroupId);
         if (ioe instanceof RemoteException) {
           ioe = ((RemoteException) ioe).unwrapRemoteException();
           LOG.warn("Can't replicate because of an error on the remote cluster: 
", ioe);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 92ab070..830ebe1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -41,11 +41,10 @@ public class MetricsSource implements BaseSource {
   private static final Logger LOG = 
LoggerFactory.getLogger(MetricsSource.class);
 
   // tracks last shipped timestamp for each wal group
-  private Map<String, Long> lastShippedTimeStamps = new HashMap<String, 
Long>();
+  private Map<String, Long> lastTimestamps = new HashMap<>();
   private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
   private long lastHFileRefsQueueSize = 0;
   private String id;
-  private long timeStampNextToReplicate;
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
@@ -82,7 +81,7 @@ public class MetricsSource implements BaseSource {
 
   /**
    * Set the age of the last edit that was shipped
-   * @param timestamp target write time of the edit
+   * @param timestamp write time of the edit
    * @param walGroup which group we are setting
    */
   public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
@@ -90,7 +89,7 @@ public class MetricsSource implements BaseSource {
     singleSourceSource.setLastShippedAge(age);
     globalSourceSource.setLastShippedAge(age);
     this.ageOfLastShippedOp.put(walGroup, age);
-    this.lastShippedTimeStamps.put(walGroup, timestamp);
+    this.lastTimestamps.put(walGroup, timestamp);
   }
 
   /**
@@ -107,6 +106,15 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * get the last timestamp of given wal group. If the walGroup is null, 
return 0.
+   * @param walGroup which group we are getting
+   * @return timeStamp
+   */
+  public long getLastTimeStampOfWalGroup(String walGroup) {
+    return this.lastTimestamps.get(walGroup) == null ? 0 : 
lastTimestamps.get(walGroup);
+  }
+
+  /**
    * get age of last shipped op of given wal group. If the walGroup is null, 
return 0
    * @param walGroup which group we are getting
    * @return age
@@ -121,9 +129,9 @@ public class MetricsSource implements BaseSource {
    * @param walGroupId id of the group to update
    */
   public void refreshAgeOfLastShippedOp(String walGroupId) {
-    Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
+    Long lastTimestamp = this.lastTimestamps.get(walGroupId);
     if (lastTimestamp == null) {
-      this.lastShippedTimeStamps.put(walGroupId, 0L);
+      this.lastTimestamps.put(walGroupId, 0L);
       lastTimestamp = 0L;
     }
     if (lastTimestamp > 0) {
@@ -191,30 +199,6 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
-   * Gets the number of edits not eligible for replication this source queue 
logs so far.
-   * @return logEditsFiltered non-replicable edits filtered from this queue 
logs.
-   */
-  public long getEditsFiltered(){
-    return this.singleSourceSource.getEditsFiltered();
-  }
-
-  /**
-   * Gets the number of edits eligible for replication read from this source 
queue logs so far.
-   * @return replicableEdits total number of replicable edits read from this 
queue logs.
-   */
-  public long getReplicableEdits(){
-    return this.singleSourceSource.getWALEditsRead() - 
this.singleSourceSource.getEditsFiltered();
-  }
-
-  /**
-   * Gets the number of OPs shipped by this source queue to target cluster.
-   * @return oPsShipped total number of OPs shipped by this source.
-   */
-  public long getOpsShipped() {
-    return this.singleSourceSource.getShippedOps();
-  }
-
-  /**
    * Convience method to apply changes to metrics do to shipping a batch of 
logs.
    *
    * @param batchSize the size of the batch that was shipped to sinks.
@@ -239,9 +223,8 @@ public class MetricsSource implements BaseSource {
     singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
     singleSourceSource.clear();
     globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
-    lastShippedTimeStamps.clear();
+    lastTimestamps.clear();
     lastHFileRefsQueueSize = 0;
-    timeStampNextToReplicate = 0;
   }
 
   /**
@@ -277,7 +260,7 @@ public class MetricsSource implements BaseSource {
    */
   public long getTimestampOfLastShippedOp() {
     long lastTimestamp = 0L;
-    for (long ts : lastShippedTimeStamps.values()) {
+    for (long ts : lastTimestamps.values()) {
       if (ts > lastTimestamp) {
         lastTimestamp = ts;
       }
@@ -286,32 +269,6 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
-   * TimeStamp of next edit to be replicated.
-   * @return timeStampNextToReplicate - TimeStamp of next edit to be 
replicated.
-   */
-  public long getTimeStampNextToReplicate() {
-    return timeStampNextToReplicate;
-  }
-
-  /**
-   * TimeStamp of next edit targeted for replication. Used for calculating lag,
-   * as if this timestamp is greater than timestamp of last shipped, it means 
there's
-   * at least one edit pending replication.
-   * @param timeStampNextToReplicate timestamp of next edit in the queue that 
should be replicated.
-   */
-  public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
-    this.timeStampNextToReplicate = timeStampNextToReplicate;
-  }
-
-  public long getReplicationDelay() {
-    if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){
-      return 0;
-    }else{
-      return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
-    }
-  }
-
-  /**
    * Get the slave peer ID
    * @return peerID
    */
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 9a449ad..799d975 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -274,12 +274,25 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
   }
 
   private void buildReplicationLoad() {
-    List<ReplicationSourceInterface> allSources = new ArrayList<>();
-    allSources.addAll(this.replicationManager.getSources());
-    allSources.addAll(this.replicationManager.getOldSources());
+    List<MetricsSource> sourceMetricsList = new ArrayList<>();
+
+    // get source
+    List<ReplicationSourceInterface> sources = 
this.replicationManager.getSources();
+    for (ReplicationSourceInterface source : sources) {
+      sourceMetricsList.add(source.getSourceMetrics());
+    }
+
+    // get old source
+    List<ReplicationSourceInterface> oldSources = 
this.replicationManager.getOldSources();
+    for (ReplicationSourceInterface source : oldSources) {
+      if (source instanceof ReplicationSource) {
+        sourceMetricsList.add(source.getSourceMetrics());
+      }
+    }
+
     // get sink
     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
-    this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
+    this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index fe4086b..53e560b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -19,14 +19,15 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.util.Strings;
+import java.util.Map;
 
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Strings;
 
 /**
  * This class is used for exporting some of the info from replication metrics
@@ -36,9 +37,11 @@ public class ReplicationLoad {
 
   // Empty load instance.
   public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new 
ReplicationLoad();
+
+  private List<MetricsSource> sourceMetricsList;
   private MetricsSink sinkMetrics;
 
-  private List<ClusterStatusProtos.ReplicationLoadSource> 
replicationLoadSourceEntries;
+  private List<ClusterStatusProtos.ReplicationLoadSource> 
replicationLoadSourceList;
   private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
 
   /** default constructor */
@@ -48,12 +51,13 @@ public class ReplicationLoad {
 
   /**
    * buildReplicationLoad
-   * @param sources List of ReplicationSource instances for which metrics 
should be reported
+   * @param srMetricsList
    * @param skMetrics
    */
 
-  public void buildReplicationLoad(final List<ReplicationSourceInterface> 
sources,
+  public void buildReplicationLoad(final List<MetricsSource> srMetricsList,
       final MetricsSink skMetrics) {
+    this.sourceMetricsList = srMetricsList;
     this.sinkMetrics = skMetrics;
 
     // build the SinkLoad
@@ -63,9 +67,10 @@ public class ReplicationLoad {
     
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
     this.replicationLoadSink = rLoadSinkBuild.build();
 
-    this.replicationLoadSourceEntries = new ArrayList<>();
-    for (ReplicationSourceInterface source : sources) {
-      MetricsSource sm = source.getSourceMetrics();
+    // build the SourceLoad List
+    Map<String, ClusterStatusProtos.ReplicationLoadSource> 
replicationLoadSourceMap =
+        new HashMap<>();
+    for (MetricsSource sm : this.sourceMetricsList) {
       // Get the actual peer id
       String peerId = sm.getPeerID();
       String[] parts = peerId.split("-", 2);
@@ -73,11 +78,18 @@ public class ReplicationLoad {
 
       long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
       int sizeOfLogQueue = sm.getSizeOfLogQueue();
-      long editsRead = sm.getReplicableEdits();
-      long oPsShipped = sm.getOpsShipped();
       long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
-      long timeStampOfNextToReplicate = sm.getTimeStampNextToReplicate();
-      long replicationLag = sm.getReplicationDelay();
+      long replicationLag =
+          calculateReplicationDelay(ageOfLastShippedOp, 
timeStampOfLastShippedOp, sizeOfLogQueue);
+
+      ClusterStatusProtos.ReplicationLoadSource rLoadSource = 
replicationLoadSourceMap.get(peerId);
+      if (rLoadSource != null) {
+        ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), 
ageOfLastShippedOp);
+        sizeOfLogQueue += rLoadSource.getSizeOfLogQueue();
+        timeStampOfLastShippedOp = 
Math.min(rLoadSource.getTimeStampOfLastShippedOp(),
+          timeStampOfLastShippedOp);
+        replicationLag = Math.max(rLoadSource.getReplicationLag(), 
replicationLag);
+      }
       ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
           ClusterStatusProtos.ReplicationLoadSource.newBuilder();
       rLoadSourceBuild.setPeerID(peerId);
@@ -85,19 +97,33 @@ public class ReplicationLoad {
       rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
       rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
       rLoadSourceBuild.setReplicationLag(replicationLag);
-      
rLoadSourceBuild.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate);
-      rLoadSourceBuild.setEditsRead(editsRead);
-      rLoadSourceBuild.setOPsShipped(oPsShipped);
-      if (source instanceof ReplicationSource){
-        ReplicationSource replSource = (ReplicationSource)source;
-        
rLoadSourceBuild.setRecovered(replSource.getReplicationQueueInfo().isQueueRecovered());
-        
rLoadSourceBuild.setQueueId(replSource.getReplicationQueueInfo().getQueueId());
-        rLoadSourceBuild.setRunning(replSource.isWorkerRunning());
-        rLoadSourceBuild.setEditsSinceRestart(timeStampOfNextToReplicate>0);
-      }
 
-      this.replicationLoadSourceEntries.add(rLoadSourceBuild.build());
+      replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build());
+    }
+    this.replicationLoadSourceList = new 
ArrayList<>(replicationLoadSourceMap.values());
+  }
+
+  static long calculateReplicationDelay(long ageOfLastShippedOp,
+      long timeStampOfLastShippedOp, int sizeOfLogQueue) {
+    long replicationLag;
+    long timePassedAfterLastShippedOp;
+    if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to 
Long.MAX_VALUE
+      return Long.MAX_VALUE;
+    } else {
+      timePassedAfterLastShippedOp =
+          EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
+    }
+    if (sizeOfLogQueue > 1) {
+      // err on the large side
+      replicationLag = Math.max(ageOfLastShippedOp, 
timePassedAfterLastShippedOp);
+    } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
+      replicationLag = ageOfLastShippedOp; // last shipped happen recently
+    } else {
+      // last shipped may happen last night,
+      // so NO real lag although ageOfLastShippedOp is non-zero
+      replicationLag = 0;
     }
+    return replicationLag;
   }
 
   /**
@@ -105,17 +131,18 @@ public class ReplicationLoad {
    * @return a string contains sourceReplicationLoad information
    */
   public String sourceToString() {
+    if (this.sourceMetricsList == null) return null;
+
     StringBuilder sb = new StringBuilder();
 
-    for (ClusterStatusProtos.ReplicationLoadSource rls :
-        this.replicationLoadSourceEntries) {
+    for (ClusterStatusProtos.ReplicationLoadSource rls : 
this.replicationLoadSourceList) {
 
       sb = Strings.appendKeyValue(sb, "\n           PeerID", rls.getPeerID());
       sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", 
rls.getAgeOfLastShippedOp());
       sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", 
rls.getSizeOfLogQueue());
       sb =
           Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp",
-              (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
+            (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
       sb = Strings.appendKeyValue(sb, "Replication Lag", 
rls.getReplicationLag());
     }
 
@@ -144,8 +171,8 @@ public class ReplicationLoad {
     return this.replicationLoadSink;
   }
 
-  public List<ClusterStatusProtos.ReplicationLoadSource> 
getReplicationLoadSourceEntries() {
-    return this.replicationLoadSourceEntries;
+  public List<ClusterStatusProtos.ReplicationLoadSource> 
getReplicationLoadSourceList() {
+    return this.replicationLoadSourceList;
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8e001e6..f1b6e76 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -179,7 +179,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 
10.0);
     this.totalBufferUsed = manager.getTotalBufferUsed();
     this.walFileLengthProvider = walFileLengthProvider;
-    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
+    LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", 
queueId,
       replicationPeer.getId(), this.currentBandwidth);
   }
 
@@ -209,8 +209,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     } else {
       queue.put(log);
     }
-    LOG.trace("Added log file {} to queue of source {}.", logPrefix,
-        this.replicationQueueInfo.getQueueId());
+
     this.metrics.incrSizeOfLogQueue();
     // This will log a warning for each new log that gets created above the 
warn threshold
     int queueSize = queue.size();
@@ -320,13 +319,15 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   @Override
   public Map<String, ReplicationStatus> getWalGroupStatus() {
     Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
-    long ageOfLastShippedOp, replicationDelay, fileSize;
+    long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
     for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : 
workerThreads.entrySet()) {
       String walGroupId = walGroupShipper.getKey();
       ReplicationSourceShipper shipper = walGroupShipper.getValue();
+      lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
       ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
       int queueSize = queues.get(walGroupId).size();
-      replicationDelay = metrics.getReplicationDelay();
+      replicationDelay =
+          ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, 
lastTimeStamp, queueSize);
       Path currentPath = shipper.getCurrentPath();
       fileSize = -1;
       if (currentPath != null) {
@@ -480,8 +481,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     for (;;) {
       peerClusterId = replicationEndpoint.getPeerUUID();
       if (this.isSourceActive() && peerClusterId == null) {
-        LOG.debug("Could not connect to Peer ZK. Sleeping for "
-            + (this.sleepForRetries * sleepMultiplier) + " millis.");
         if (sleepForRetries("Cannot contact the peer's zk ensemble", 
sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -499,8 +498,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       this.manager.removeSource(this);
       return;
     }
-    LOG.info("Source: {}, is now replicating from cluster: {}; to peer 
cluster: {};",
-        this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
+    LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
 
     initializeWALEntryFilter(peerClusterId);
     // start workers
@@ -549,9 +547,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     Collection<ReplicationSourceShipper> workers = workerThreads.values();
     for (ReplicationSourceShipper worker : workers) {
       worker.stopWorker();
-      if(worker.entryReader != null) {
-        worker.entryReader.setReaderRunning(false);
-      }
+      worker.entryReader.setReaderRunning(false);
     }
 
     for (ReplicationSourceShipper worker : workers) {
@@ -617,10 +613,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return !this.server.isStopped() && this.sourceRunning;
   }
 
-  public UUID getPeerClusterUUID(){
-    return this.clusterId;
-  }
-
   /**
    * Comparator used to compare logs together based on their start time
    */
@@ -646,19 +638,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     }
   }
 
-  public ReplicationQueueInfo getReplicationQueueInfo() {
-    return replicationQueueInfo;
-  }
-
-  public boolean isWorkerRunning(){
-    for(ReplicationSourceShipper worker : this.workerThreads.values()){
-      if(worker.isActive()){
-        return worker.isActive();
-      }
-    }
-    return false;
-  }
-
   @Override
   public String getStats() {
     StringBuilder sb = new StringBuilder();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 12a903a..20c1215 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -381,7 +381,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           abortAndThrowIOExceptionWhenFail(
             () -> this.queueStorage.addWAL(server.getServerName(), peerId, 
walPath.getName()));
           src.enqueueLog(walPath);
-          LOG.trace("Enqueued {} to source {} during source creation.", 
walPath, src.getQueueId());
         }
       }
     }
@@ -790,8 +789,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     // This only updates the sources we own, not the recovered ones
     for (ReplicationSourceInterface source : this.sources.values()) {
       source.enqueueLog(newLog);
-      LOG.trace("Enqueued {} to source {} while performing postLogRoll 
operation.",
-          newLog, source.getQueueId());
     }
   }
 
@@ -963,9 +960,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
               wals.add(wal);
             }
             oldsources.add(src);
-            LOG.trace("Added source for recovered queue: " + src.getQueueId());
             for (String wal : walsSet) {
-              LOG.trace("Enqueueing log from recovered queue for source: " + 
src.getQueueId());
               src.enqueueLog(new Path(oldLogDir, wal));
             }
             src.startup();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 17e1167..5d6198e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -91,7 +92,6 @@ public class ReplicationSourceShipper extends Thread {
   @Override
   public final void run() {
     setWorkerState(WorkerState.RUNNING);
-    LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", 
this.walGroupId);
     // Loop until we close down
     while (isActive()) {
       // Sleep until replication is enabled again
@@ -103,9 +103,10 @@ public class ReplicationSourceShipper extends Thread {
       }
       try {
         WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
-        LOG.debug("Shipper from source {} got entry batch from reader: {}",
-            source.getQueueId(), entryBatch);
         if (entryBatch == null) {
+          // since there is no logs need to replicate, we refresh the 
ageOfLastShippedOp
+          
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
+            walGroupId);
           continue;
         }
         // the NO_MORE_DATA instance has no path so do not call shipEdits
@@ -162,13 +163,16 @@ public class ReplicationSourceShipper extends Thread {
     List<Entry> entries = entryBatch.getWalEntries();
     int sleepMultiplier = 0;
     if (entries.isEmpty()) {
-      updateLogPosition(entryBatch);
+      if (updateLogPosition(entryBatch)) {
+        // if there was nothing to ship and it's not an error
+        // set "ageOfLastShippedOp" to <now> to indicate that we're current
+        
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
+          walGroupId);
+      }
       return;
     }
     int currentSize = (int) entryBatch.getHeapSize();
     int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
-    
source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size()
 - 1)
-        .getKey().getWriteTime());
     while (isActive()) {
       try {
         try {
@@ -180,6 +184,7 @@ public class ReplicationSourceShipper extends Thread {
           // directly go back to while() for confirm this
           continue;
         }
+
         // create replicateContext here, so the entries can be GC'd upon 
return from this call
         // stack
         ReplicationEndpoint.ReplicateContext replicateContext =
@@ -200,7 +205,7 @@ public class ReplicationSourceShipper extends Thread {
         // Clean up hfile references
         for (Entry entry : entries) {
           cleanUpHFileRefs(entry.getEdit());
-          LOG.trace("shipped entry {}: ", entry);
+
           TableName tableName = entry.getKey().getTableName();
           
source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
               tableName.getNameAsString());
@@ -219,7 +224,7 @@ public class ReplicationSourceShipper extends Thread {
         source.getSourceMetrics().setAgeOfLastShippedOp(
           entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
         if (LOG.isTraceEnabled()) {
-          LOG.debug("Replicated {} entries or {} operations in {} ms",
+          LOG.trace("Replicated {} entries or {} operations in {} ms",
               entries.size(), entryBatch.getNbOperations(), (endTimeNs - 
startTimeNs) / 1000000);
         }
         break;
@@ -304,7 +309,7 @@ public class ReplicationSourceShipper extends Thread {
     return 0;
   }
 
-  protected boolean isActive() {
+  private boolean isActive() {
     return source.isSourceActive() && state == WorkerState.RUNNING && 
!isInterrupted();
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 3866513..d9f1785 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -140,7 +140,7 @@ class ReplicationSourceWALReader extends Thread {
           if (batch != null) {
             // need to propagate the batch even it has no entries since it may 
carry the last
             // sequence id information for serial replication.
-            LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
+            LOG.trace("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
             entryBatchQueue.put(batch);
             sleepMultiplier = 1;
           } else { // got no entries and didn't advance position in WAL
@@ -168,11 +168,8 @@ class ReplicationSourceWALReader extends Thread {
   protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
     WALEdit edit = entry.getEdit();
     if (edit == null || edit.isEmpty()) {
-      LOG.debug("Edit null or empty for entry {} ", entry);
       return false;
     }
-    LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for 
source queue: {}",
-        entry.getKey().getWriteTime(), entry.getKey(), 
this.source.getQueueId());
     long entrySize = getEntrySizeIncludeBulkLoad(entry);
     long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
     batch.addEntry(entry);
@@ -288,8 +285,7 @@ class ReplicationSourceWALReader extends Thread {
 
   protected final Entry filterEntry(Entry entry) {
     Entry filtered = filter.filter(entry);
-    if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) 
{
-      LOG.debug("Filtered entry for replication: {}", entry);
+    if (entry != null && filtered == null) {
       source.getSourceMetrics().incrLogEditsFiltered();
     }
     return filtered;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 24d7fb9..2201932 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -174,7 +174,6 @@ class WALEntryStream implements Closeable {
   private void tryAdvanceEntry() throws IOException {
     if (checkReader()) {
       boolean beingWritten = readNextEntryAndRecordReaderPosition();
-      LOG.trace("reading wal file {}. Current open for write: {}", 
this.currentPath, beingWritten);
       if (currentEntry == null && !beingWritten) {
         // no more entries in this log file, and the file is already closed, 
i.e, rolled
         // Before dequeueing, we should always get one more attempt at reading.
@@ -274,7 +273,6 @@ class WALEntryStream implements Closeable {
       return true;
     }
     if (readEntry != null) {
-      LOG.trace("reading entry: {} ", readEntry);
       metrics.incrLogEditsRead();
       metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
index ac9ae39..bf72383 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
@@ -86,52 +86,22 @@ public class TestGetReplicationLoad {
 
   @Test
   public void testGetReplicationMetrics() throws Exception {
-    String peer1 = "test1", peer2 = "test2", queueId="1";
-    long ageOfLastShippedOp = 2,
-        replicationLag = 3,
-        timeStampOfLastShippedOp = 4,
-        timeStampOfNextToReplicate=5,
-        editsRead=6,
-        oPsShipped=7;
-    int sizeOfLogQueue = 8;
-    boolean recovered=false,
-        running=false,
-        editsSinceRestart=false;
+    String peer1 = "test1", peer2 = "test2";
+    long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp 
= 4;
+    int sizeOfLogQueue = 5;
     RegionServerStatusProtos.RegionServerReportRequest.Builder request =
         RegionServerStatusProtos.RegionServerReportRequest.newBuilder();
     ServerName serverName = cluster.getMaster(0).getServerName();
     request.setServer(ProtobufUtil.toServerName(serverName));
     ClusterStatusProtos.ReplicationLoadSource rload1 = 
ClusterStatusProtos.ReplicationLoadSource
-        .newBuilder().setPeerID(peer1)
-        .setAgeOfLastShippedOp(ageOfLastShippedOp)
-        .setReplicationLag(replicationLag)
-        .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
-        .setSizeOfLogQueue(sizeOfLogQueue)
-        .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate)
-        .setQueueId(queueId)
-        .setEditsRead(editsRead)
-        .setOPsShipped(oPsShipped)
-        .setRunning(running)
-        .setRecovered(recovered)
-        .setEditsSinceRestart(editsSinceRestart)
-        .build();
+        
.newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp)
+        
.setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
+        .setSizeOfLogQueue(sizeOfLogQueue).build();
     ClusterStatusProtos.ReplicationLoadSource rload2 =
-        ClusterStatusProtos.ReplicationLoadSource
-            .newBuilder()
-            .setPeerID(peer2)
-            .setAgeOfLastShippedOp(ageOfLastShippedOp + 1)
-            .setReplicationLag(replicationLag + 1)
+        ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2)
+            .setAgeOfLastShippedOp(ageOfLastShippedOp + 
1).setReplicationLag(replicationLag + 1)
             .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1)
-            .setSizeOfLogQueue(sizeOfLogQueue + 1)
-            .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate+1)
-            .setQueueId(queueId)
-            .setEditsRead(editsRead+1)
-            .setOPsShipped(oPsShipped+1)
-            .setRunning(running)
-            .setRecovered(recovered)
-            .setEditsSinceRestart(editsSinceRestart)
-            .build();
-
+            .setSizeOfLogQueue(sizeOfLogQueue + 1).build();
     ClusterStatusProtos.ServerLoad sl = 
ClusterStatusProtos.ServerLoad.newBuilder()
         .addReplLoadSource(rload1).addReplLoadSource(rload2).build();
     request.setLoad(sl);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index dd60597..cd84293 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -173,7 +173,8 @@ public class TestReplicationBase {
     htable1.put(puts);
   }
 
-  protected static void configureClusters(){
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
     conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
     // We don't want too many edits per batch sent to the ReplicationEndpoint 
to trigger
     // sufficient number of events. But we don't want to go too low because
@@ -195,17 +196,6 @@ public class TestReplicationBase {
     conf1.setLong("hbase.serial.replication.waiting.ms", 100);
 
     utility1 = new HBaseTestingUtility(conf1);
-
-    // Base conf2 on conf1 so it gets the right zk cluster.
-    conf2 = HBaseConfiguration.create(conf1);
-    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
-    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
-    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-
-    utility2 = new HBaseTestingUtility(conf2);
-  }
-
-  protected static void startClusters() throws Exception{
     utility1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
     // Have to reget conf1 in case zk cluster location different
@@ -215,6 +205,13 @@ public class TestReplicationBase {
     admin = new ReplicationAdmin(conf1);
     LOG.info("Setup first Zk");
 
+    // Base conf2 on conf1 so it gets the right zk cluster.
+    conf2 = HBaseConfiguration.create(conf1);
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+
+    utility2 = new HBaseTestingUtility(conf2);
     utility2.setZkCluster(miniZK);
     zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
     LOG.info("Setup second Zk");
@@ -249,12 +246,6 @@ public class TestReplicationBase {
     htable2 = connection2.getTable(tableName);
   }
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    configureClusters();
-    startClusters();
-  }
-
   private boolean peerExist(String peerId) throws IOException {
     return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> 
peerId.equals(p.getPeerId()));
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
index c778f52..aaa843e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.util.EnumSet;
@@ -34,10 +33,6 @@ import 
org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -65,8 +60,7 @@ public class TestReplicationStatus extends 
TestReplicationBase {
   @Test
   public void testReplicationStatus() throws Exception {
     LOG.info("testReplicationStatus");
-    utility2.shutdownMiniHBaseCluster();
-    utility2.startMiniHBaseCluster(1,4);
+
     try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
       // disable peer
       admin.disablePeer(PEER_ID);
@@ -109,204 +103,11 @@ public class TestReplicationStatus extends 
TestReplicationBase {
       ServerLoad sl = status.getLoad(server);
       List<ReplicationLoadSource> rLoadSourceList = 
sl.getReplicationLoadSourceList();
       // check SourceList still only has one entry
-      assertTrue("failed to get ReplicationLoadSourceList", 
(rLoadSourceList.size() == 2));
+      assertTrue("failed to get ReplicationLoadSourceList", 
(rLoadSourceList.size() == 1));
       assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID());
     } finally {
       admin.enablePeer(PEER_ID);
       utility1.getHBaseCluster().getRegionServer(1).start();
     }
   }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    //we need to perform initialisations from 
TestReplicationBase.setUpBeforeClass() on each
-    //test here, so we override BeforeClass to do nothing and call
-    // TestReplicationBase.setUpBeforeClass() from setup method
-    TestReplicationBase.configureClusters();
-  }
-
-  @Before
-  @Override
-  public void setUpBase() throws Exception {
-    TestReplicationBase.startClusters();
-    super.setUpBase();
-  }
-
-  @After
-  @Override
-  public void tearDownBase() throws Exception {
-    utility2.shutdownMiniCluster();
-    utility1.shutdownMiniCluster();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass(){
-    //We need to override it here to avoid issues when trying to execute super 
class teardown
-  }
-
-  @Test
-  public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws 
Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    Admin hbaseAdmin = utility1.getConnection().getAdmin();
-    ServerName serverName = utility1.getHBaseCluster().
-        getRegionServer(0).getServerName();
-    Thread.sleep(10000);
-    ClusterStatus status = new ClusterStatus(hbaseAdmin.
-        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-    List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
-        get(serverName).getReplicationLoadSourceList();
-    assertEquals(1, loadSources.size());
-    ReplicationLoadSource loadSource = loadSources.get(0);
-    assertFalse(loadSource.hasEditsSinceRestart());
-    assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-    assertEquals(0, loadSource.getReplicationLag());
-    assertFalse(loadSource.isRecovered());
-  }
-
-  @Test
-  public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws 
Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    Admin hbaseAdmin = utility1.getConnection().getAdmin();
-    //add some values to source cluster
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
-    Thread.sleep(10000);
-    ServerName serverName = utility1.getHBaseCluster().
-        getRegionServer(0).getServerName();
-    ClusterStatus status = new ClusterStatus(hbaseAdmin.
-        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-    List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
-        get(serverName).getReplicationLoadSourceList();
-    assertEquals(1, loadSources.size());
-    ReplicationLoadSource loadSource = loadSources.get(0);
-    assertTrue(loadSource.hasEditsSinceRestart());
-    assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-    assertTrue(loadSource.getReplicationLag()>0);
-    assertFalse(loadSource.isRecovered());
-  }
-
-  @Test
-  public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() 
throws Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    //add some values to cluster 1
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
-    Thread.sleep(10000);
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    Admin hbaseAdmin = utility1.getConnection().getAdmin();
-    ServerName serverName = utility1.getHBaseCluster().
-        getRegionServer(0).getServerName();
-    Thread.sleep(10000);
-    ClusterStatus status = new ClusterStatus(hbaseAdmin.
-        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-    List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
-        get(serverName).getReplicationLoadSourceList();
-    assertEquals(2, loadSources.size());
-    boolean foundRecovery = false;
-    boolean foundNormal = false;
-    for(ReplicationLoadSource loadSource : loadSources){
-      if (loadSource.isRecovered()){
-        foundRecovery = true;
-        assertTrue(loadSource.hasEditsSinceRestart());
-        assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-        assertTrue(loadSource.getReplicationLag()>0);
-      } else {
-        foundNormal = true;
-        assertFalse(loadSource.hasEditsSinceRestart());
-        assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-        assertEquals(0, loadSource.getReplicationLag());
-      }
-    }
-    assertTrue("No normal queue found.", foundNormal);
-    assertTrue("No recovery queue found.", foundRecovery);
-  }
-
-  @Test
-  public void testReplicationStatusBothNormalAndRecoveryLagging() throws 
Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    //add some values to cluster 1
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
-    Thread.sleep(10000);
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    Admin hbaseAdmin = utility1.getConnection().getAdmin();
-    ServerName serverName = utility1.getHBaseCluster().
-        getRegionServer(0).getServerName();
-    Thread.sleep(10000);
-    //add more values to cluster 1, these should cause normal queue to lag
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
-    Thread.sleep(10000);
-    ClusterStatus status = new ClusterStatus(hbaseAdmin.
-        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-    List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
-        get(serverName).getReplicationLoadSourceList();
-    assertEquals(2, loadSources.size());
-    boolean foundRecovery = false;
-    boolean foundNormal = false;
-    for(ReplicationLoadSource loadSource : loadSources){
-      if (loadSource.isRecovered()){
-        foundRecovery = true;
-      } else {
-        foundNormal = true;
-      }
-      assertTrue(loadSource.hasEditsSinceRestart());
-      assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-      assertTrue(loadSource.getReplicationLag()>0);
-    }
-    assertTrue("No normal queue found.", foundNormal);
-    assertTrue("No recovery queue found.", foundRecovery);
-  }
-
-  @Test
-  public void testReplicationStatusAfterLagging() throws Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    //add some values to cluster 1
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
-    utility2.startMiniHBaseCluster();
-    Thread.sleep(10000);
-    try(Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
-      ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).
-          getServerName();
-      ClusterStatus status =
-          new 
ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-      List<ReplicationLoadSource> loadSources = 
status.getLiveServerMetrics().get(serverName).
-          getReplicationLoadSourceList();
-      assertEquals(1, loadSources.size());
-      ReplicationLoadSource loadSource = loadSources.get(0);
-      assertTrue(loadSource.hasEditsSinceRestart());
-      assertTrue(loadSource.getTimestampOfLastShippedOp() > 0);
-      assertEquals(0, loadSource.getReplicationLag());
-    }finally{
-      utility2.shutdownMiniHBaseCluster();
-    }
-  }
 }
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb 
b/hbase-shell/src/main/ruby/hbase/admin.rb
index 8e1b603..f716de9 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -20,7 +20,6 @@
 include Java
 java_import java.util.Arrays
 java_import java.util.regex.Pattern
-java_import java.util.Date
 java_import org.apache.hadoop.hbase.util.Pair
 java_import org.apache.hadoop.hbase.util.RegionSplitter
 java_import org.apache.hadoop.hbase.util.Bytes
@@ -786,30 +785,37 @@ module Hbase
           puts(format('    %s', server))
         end
       elsif format == 'replication'
-        puts(format('version %<version>s', version: status.getHBaseVersion))
-        puts(format('%<servers>d live servers', servers: 
status.getServersSize))
-        status.getServers.each do |server_status|
-          sl = status.getLoad(server_status)
-          r_sink_string   = '      SINK:'
-          r_source_string = '       SOURCE:'
-          r_load_sink = sl.getReplicationLoadSink
-          next if r_load_sink.nil?
-
-          r_sink_string << ' AgeOfLastAppliedOp=' +
-                           r_load_sink.getAgeOfLastAppliedOp.to_s
-          r_sink_string << ', TimeStampsOfLastAppliedOp=' +
-                           Date.new(r_load_sink
-                             .getTimeStampsOfLastAppliedOp).toString
-          r_load_source_map = sl.getReplicationLoadSourceMap
-          build_source_string(r_load_source_map, r_source_string)
-          puts(format('    %<host>s:', host: server_status.getHostname))
-          if type.casecmp('SOURCE').zero?
-            puts(format('%<source>s', source: r_source_string))
-          elsif type.casecmp('SINK').zero?
-            puts(format('%<sink>s', sink: r_sink_string))
+        puts(format('version %s', status.getHBaseVersion))
+        puts(format('%d live servers', status.getServersSize))
+        for server in status.getServers
+          sl = status.getLoad(server)
+          rSinkString   = '       SINK  :'
+          rSourceString = '       SOURCE:'
+          rLoadSink = sl.getReplicationLoadSink
+          next if rLoadSink.nil?
+          rSinkString << ' AgeOfLastAppliedOp=' + 
rLoadSink.getAgeOfLastAppliedOp.to_s
+          rSinkString << ', TimeStampsOfLastAppliedOp=' +
+                         
java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp).toString
+          rLoadSourceList = sl.getReplicationLoadSourceList
+          index = 0
+          while index < rLoadSourceList.size
+            rLoadSource = rLoadSourceList.get(index)
+            rSourceString << ' PeerID=' + rLoadSource.getPeerID
+            rSourceString << ', AgeOfLastShippedOp=' + 
rLoadSource.getAgeOfLastShippedOp.to_s
+            rSourceString << ', SizeOfLogQueue=' + 
rLoadSource.getSizeOfLogQueue.to_s
+            rSourceString << ', TimeStampsOfLastShippedOp=' +
+                             
java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp).toString
+            rSourceString << ', Replication Lag=' + 
rLoadSource.getReplicationLag.to_s
+            index += 1
+          end
+          puts(format('    %s:', server.getHostname))
+          if type.casecmp('SOURCE') == 0
+            puts(format('%s', rSourceString))
+          elsif type.casecmp('SINK') == 0
+            puts(format('%s', rSinkString))
           else
-            puts(format('%<source>s', source: r_source_string))
-            puts(format('%<sink>s', sink: r_sink_string))
+            puts(format('%s', rSourceString))
+            puts(format('%s', rSinkString))
           end
         end
       elsif format == 'simple'
@@ -838,71 +844,6 @@ module Hbase
       end
     end
 
-    def build_source_string(r_load_source_map, r_source_string)
-      r_load_source_map.each do |peer, sources|
-        r_source_string << ' PeerID=' + peer
-        sources.each do |source_load|
-          build_queue_title(source_load, r_source_string)
-          build_running_source_stats(source_load, r_source_string)
-        end
-      end
-    end
-
-    def build_queue_title(source_load, r_source_string)
-      r_source_string << if source_load.isRecovered
-                           "\n         Recovered Queue: "
-                         else
-                           "\n         Normal Queue: "
-                         end
-      r_source_string << source_load.getQueueId
-    end
-
-    def build_running_source_stats(source_load, r_source_string)
-      if source_load.isRunning
-        build_shipped_stats(source_load, r_source_string)
-        build_load_general_stats(source_load, r_source_string)
-        r_source_string << ', Replication Lag=' +
-                           source_load.getReplicationLag.to_s
-      else
-        r_source_string << "\n           "
-        r_source_string << 'No Reader/Shipper threads runnning yet.'
-      end
-    end
-
-    def build_shipped_stats(source_load, r_source_string)
-      r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
-                           "\n           " \
-                           'No Ops shipped since last restart'
-                         else
-                           "\n           AgeOfLastShippedOp=" +
-                           source_load.getAgeOfLastShippedOp.to_s +
-                           ', TimeStampOfLastShippedOp=' +
-                           Date.new(source_load
-                             .getTimeStampOfLastShippedOp).toString
-                         end
-    end
-
-    def build_load_general_stats(source_load, r_source_string)
-      r_source_string << ', SizeOfLogQueue=' +
-                         source_load.getSizeOfLogQueue.to_s
-      r_source_string << ', EditsReadFromLogQueue=' +
-                         source_load.getEditsRead.to_s
-      r_source_string << ', OpsShippedToTarget=' +
-                         source_load.getOPsShipped.to_s
-      build_edits_for_source(source_load, r_source_string)
-    end
-
-    def build_edits_for_source(source_load, r_source_string)
-      if source_load.hasEditsSinceRestart
-        r_source_string << ', TimeStampOfNextToReplicate=' +
-                           Date.new(source_load
-                             .getTimeStampOfNextToReplicate).toString
-      else
-        r_source_string << ', No edits for this source'
-        r_source_string << ' since it started'
-      end
-    end
-
     
#----------------------------------------------------------------------------------------------
     #
     # Helper methods

Reply via email to