http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-protocol/src/main/protobuf/ClusterStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto 
b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
index 7e78395..17f79fe 100644
--- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
@@ -117,6 +117,19 @@ message RegionLoad {
 
 /* Server-level protobufs */
 
+message ReplicationLoadSink {
+  required uint64 ageOfLastAppliedOp = 1;
+  required uint64 timeStampsOfLastAppliedOp = 2;
+}
+
+message ReplicationLoadSource {
+  required string peerID = 1;
+  required uint64 ageOfLastShippedOp = 2;
+  required uint32 sizeOfLogQueue = 3;
+  required uint64 timeStampOfLastShippedOp = 4;
+  required uint64 replicationLag = 5;
+}
+
 message ServerLoad {
   /** Number of requests since last report. */
   optional uint32 number_of_requests = 1;
@@ -158,6 +171,16 @@ message ServerLoad {
    * The port number that this region server is hosing an info server on.
    */
   optional uint32 info_server_port = 9;
+
+  /**
+   * The replicationLoadSource for the replication Source status of this 
region server.
+   */
+  repeated ReplicationLoadSource replLoadSource = 10;
+
+  /**
+   * The replicationLoadSink for the replication Sink status of this region 
server.
+   */
+  optional ReplicationLoadSink replLoadSink = 11;
 }
 
 message LiveServerInfo {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e8e8c1b..edb799c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -218,6 +218,7 @@ import 
org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1170,6 +1171,22 @@ public class HRegionServer implements 
ClientProtos.ClientService.BlockingInterfa
     } else {
       serverLoad.setInfoServerPort(-1);
     }
+
+    // for the replicationLoad purpose. Only need to get from one service
+    // either source or sink will get the same info
+    ReplicationSourceService rsources = getReplicationSourceService();
+
+    if (rsources != null) {
+      // always refresh first to get the latest value
+      ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
+      if (rLoad != null) {
+        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
+        for (ClusterStatusProtos.ReplicationLoadSource rLS : 
rLoad.getReplicationLoadSourceList()) {
+          serverLoad.addReplLoadSource(rLS);
+        }
+      }
+    }
+
     return serverLoad.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 92ac823..25a27a9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -22,11 +22,12 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /**
- * Gateway to Cluster Replication.  
+ * Gateway to Cluster Replication.
  * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
  * One such application is a cross-datacenter
  * replication service that can keep two hbase clusters in sync.
@@ -52,4 +53,9 @@ public interface ReplicationService {
    * Stops replication service.
    */
   void stopReplicationService();
+
+  /**
+   * Refresh and Get ReplicationLoad
+   */
+  public ReplicationLoad refreshAndGetReplicationLoad();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index 9946e37..b1087d1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -78,4 +78,21 @@ public class MetricsSink {
     mss.incrAppliedOps(batchSize);
   }
 
+  /**
+   * Get the Age of Last Applied Op
+   * @return ageOfLastAppliedOp
+   */
+  public long getAgeOfLastAppliedOp() {
+    return mss.getLastAppliedOpAge();
+  }
+
+  /**
+   * Get the TimeStampOfLastAppliedOp. If no replication Op applied yet, the 
value is the timestamp
+   * at which hbase instance starts
+   * @return timeStampsOfLastAppliedOp;
+   */
+  public long getTimeStampOfLastAppliedOp() {
+    return this.lastTimestampForAge;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index a2560e6..b5dee6f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -53,6 +53,7 @@ public class MetricsSource {
   
   private long lastTimestamp = 0;
   private int lastQueueSize = 0;
+  private String id;
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
@@ -63,6 +64,7 @@ public class MetricsSource {
    * @param id Name of the source this class is monitoring
    */
   public MetricsSource(String id) {
+    this.id = id;
     singleSourceSource =
       
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
         .getSource(id);
@@ -162,4 +164,36 @@ public class MetricsSource {
     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
     lastQueueSize = 0;
   }
+
+  /**
+   * Get AgeOfLastShippedOp
+   * @return AgeOfLastShippedOp
+   */
+  public Long getAgeOfLastShippedOp() {
+    return singleSourceSource.getLastShippedAge();
+  }
+
+  /**
+   * Get the sizeOfLogQueue
+   * @return sizeOfLogQueue
+   */
+  public int getSizeOfLogQueue() {
+    return this.lastQueueSize;
+  }
+
+  /**
+   * Get the timeStampsOfLastShippedOp
+   * @return lastTimestampForAge
+   */
+  public long getTimeStampOfLastShippedOp() {
+    return lastTimestamp;
+  }
+
+  /**
+   * Get the slave peer ID
+   * @return peerID
+   */
+  public String getPeerID() {
+    return id;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b665d70..04c6f24 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -80,6 +81,8 @@ public class Replication implements WALActionsListener,
   /** Statistics thread schedule pool */
   private ScheduledExecutorService scheduleThreadPool;
   private int statsThreadPeriod;
+  // ReplicationLoad to access replication metrics
+  private ReplicationLoad replicationLoad;
 
   /**
    * Instantiate the replication management (if rep is enabled).
@@ -136,11 +139,13 @@ public class Replication implements WALActionsListener,
       this.statsThreadPeriod =
           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
+      this.replicationLoad = new ReplicationLoad();
     } else {
       this.replicationManager = null;
       this.replicationQueues = null;
       this.replicationPeers = null;
       this.replicationTracker = null;
+      this.replicationLoad = null;
     }
   }
 
@@ -334,4 +339,29 @@ public class Replication implements WALActionsListener,
       }
     }
   }
+
+  @Override
+  public ReplicationLoad refreshAndGetReplicationLoad() {
+    if (this.replicationLoad == null) {
+      return null;
+    }
+    // always build for latest data
+    buildReplicationLoad();
+    return this.replicationLoad;
+  }
+
+  private void buildReplicationLoad() {
+    // get source
+    List<ReplicationSourceInterface> sources = 
this.replicationManager.getSources();
+    List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
+
+    for (ReplicationSourceInterface source : sources) {
+      if (source instanceof ReplicationSource) {
+        sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
+      }
+    }
+    // get sink
+    MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
+    this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
new file mode 100644
index 0000000..af034a3
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright 2014 The Apache Software Foundation Licensed to the Apache 
Software Foundation (ASF)
+ * under one or more contributor license agreements. See the NOTICE file 
distributed with this work
+ * for additional information regarding copyright ownership. The ASF licenses 
this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may not use this file 
except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable 
law or agreed to in
+ * writing, software distributed under the License is distributed on an "AS 
IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Strings;
+
+/**
+ * This class is used for exporting some of the info from replication metrics
+ */
[email protected]
+public class ReplicationLoad {
+
+  // Empty load instance.
+  public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new 
ReplicationLoad();
+
+  private List<MetricsSource> sourceMetricsList;
+  private MetricsSink sinkMetrics;
+
+  private List<ClusterStatusProtos.ReplicationLoadSource> 
replicationLoadSourceList;
+  private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
+
+  /** default constructor */
+  public ReplicationLoad() {
+    super();
+  }
+
+  /**
+   * buildReplicationLoad
+   * @param srMetricsList
+   * @param skMetrics
+   */
+
+  public void buildReplicationLoad(final List<MetricsSource> srMetricsList,
+      final MetricsSink skMetrics) {
+    this.sourceMetricsList = srMetricsList;
+    this.sinkMetrics = skMetrics;
+
+    // build the SinkLoad
+    ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
+        ClusterStatusProtos.ReplicationLoadSink.newBuilder();
+    rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
+    
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimeStampOfLastAppliedOp());
+    this.replicationLoadSink = rLoadSinkBuild.build();
+
+    // build the SourceLoad List
+    this.replicationLoadSourceList = new 
ArrayList<ClusterStatusProtos.ReplicationLoadSource>();
+    for (MetricsSource sm : this.sourceMetricsList) {
+      long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
+      int sizeOfLogQueue = sm.getSizeOfLogQueue();
+      long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
+      long replicationLag;
+      long timePassedAfterLastShippedOp =
+          EnvironmentEdgeManager.currentTimeMillis() - 
timeStampOfLastShippedOp;
+      if (sizeOfLogQueue != 0) {
+        // err on the large side
+        replicationLag = Math.max(ageOfLastShippedOp, 
timePassedAfterLastShippedOp);
+      } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
+        replicationLag = ageOfLastShippedOp; // last shipped happen recently
+      } else {
+        // last shipped may happen last night,
+        // so NO real lag although ageOfLastShippedOp is non-zero
+        replicationLag = 0;
+      }
+
+      ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
+          ClusterStatusProtos.ReplicationLoadSource.newBuilder();
+      rLoadSourceBuild.setPeerID(sm.getPeerID());
+      rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
+      rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
+      rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
+      rLoadSourceBuild.setReplicationLag(replicationLag);
+
+      this.replicationLoadSourceList.add(rLoadSourceBuild.build());
+    }
+
+  }
+
+  /**
+   * sourceToString
+   * @return a string contains sourceReplicationLoad information
+   */
+  public String sourceToString() {
+    if (this.sourceMetricsList == null) return null;
+
+    StringBuilder sb = new StringBuilder();
+
+    for (ClusterStatusProtos.ReplicationLoadSource rls : 
this.replicationLoadSourceList) {
+
+      sb = Strings.appendKeyValue(sb, "\n           PeerID", rls.getPeerID());
+      sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", 
rls.getAgeOfLastShippedOp());
+      sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", 
rls.getSizeOfLogQueue());
+      sb =
+          Strings.appendKeyValue(sb, "TimeStampsOfLastShippedOp",
+            (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
+      sb = Strings.appendKeyValue(sb, "Replication Lag", 
rls.getReplicationLag());
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * sinkToString
+   * @return a string contains sinkReplicationLoad information
+   */
+  public String sinkToString() {
+    if (this.replicationLoadSink == null) return null;
+
+    StringBuilder sb = new StringBuilder();
+    sb =
+        Strings.appendKeyValue(sb, "AgeOfLastAppliedOp",
+          this.replicationLoadSink.getAgeOfLastAppliedOp());
+    sb =
+        Strings.appendKeyValue(sb, "TimeStampsOfLastAppliedOp",
+          (new 
Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString()));
+
+    return sb.toString();
+  }
+
+  public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() {
+    return this.replicationLoadSink;
+  }
+
+  public List<ClusterStatusProtos.ReplicationLoadSource> 
getReplicationLoadSourceList() {
+    return this.replicationLoadSourceList;
+  }
+
+  /**
+   * @see java.lang.Object#toString()
+   */
+  @Override
+  public String toString() {
+    return this.sourceToString() + System.getProperty("line.separator") + 
this.sinkToString();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 746f1fd..cd45461 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -255,4 +255,12 @@ public class ReplicationSink {
       "age in ms of last applied edit: " + 
this.metrics.refreshAgeOfLastAppliedOp() +
       ", total replicated edits: " + this.totalReplicatedEdits;
   }
+
+  /**
+   * Get replication Sink Metrics
+   * @return MetricsSink
+   */
+  public MetricsSink getSinkMetrics() {
+    return this.metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1e7f5c9..17b8ef0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -856,4 +856,12 @@ public class ReplicationSource extends Thread
       ", currently replicating from: " + this.currentPath +
       " at position: " + position;
   }
+
+  /**
+   * Get Replication Source Metrics
+   * @return sourceMetrics
+   */
+  public MetricsSource getSourceMetrics() {
+    return this.metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index a5ab2cd..5e02198 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -30,11 +30,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -555,4 +558,48 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     hadmin.close();
   }
 
+  /**
+   * Test for HBASE-9531
+   * put a few rows into htable1, which should be replicated to htable2
+   * create a ClusterStatus instance 'status' from HBaseAdmin
+   * test : status.getLoad(server).getReplicationLoadSourceList()
+   * test : status.getLoad(server).getReplicationLoadSink()
+   * * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testReplicationStatus() throws Exception {
+    LOG.info("testReplicationStatus");
+
+    HBaseAdmin admin = utility1.getHBaseAdmin();
+    try {
+
+      final byte[] qualName = Bytes.toBytes("q");
+      Put p;
+
+      for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+        p = new Put(Bytes.toBytes("row" + i));
+        p.add(famName, qualName, Bytes.toBytes("val" + i));
+        htable1.put(p);
+      }
+
+      ClusterStatus status = admin.getClusterStatus();
+
+      for (ServerName server : status.getServers()) {
+        ServerLoad sl = status.getLoad(server);
+        List<ReplicationLoadSource> rLoadSourceList = 
sl.getReplicationLoadSourceList();
+        ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
+
+        // check SourceList has at least one entry
+        assertTrue("failed to get ReplicationLoadSourceList", 
(rLoadSourceList.size() > 0));
+
+        // check Sink exist only as it is difficult to verify the value on the 
fly
+        assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
+          (rLoadSink.getAgeOfLastAppliedOp() >= 0));
+        assertTrue("failed to get 
ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
+          (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
+      }
+    } finally {
+      admin.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb 
b/hbase-shell/src/main/ruby/hbase/admin.rb
index f267857..4964836 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -585,7 +585,7 @@ module Hbase
       end
     end
 
-    def status(format)
+    def status(format, type)
       status = @admin.getClusterStatus()
       if format == "detailed"
         puts("version %s" % [ status.getHBaseVersion() ])
@@ -612,6 +612,46 @@ module Hbase
         for server in status.getDeadServerNames()
           puts("    %s" % [ server ])
         end
+      elsif format == "replication"
+        #check whether replication is enabled or not
+        if 
([email protected]().getBoolean(org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_KEY,
 
+          org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_DEFAULT))
+          puts("Please enable replication first.")
+        else
+          puts("version %s" % [ status.getHBaseVersion() ])
+          puts("%d live servers" % [ status.getServersSize() ])
+          for server in status.getServers()
+            sl = status.getLoad(server)
+            rSinkString   = "       SINK  :"
+            rSourceString = "       SOURCE:"
+            rLoadSink = sl.getReplicationLoadSink()
+            rSinkString << " AgeOfLastAppliedOp=" + 
rLoadSink.getAgeOfLastAppliedOp().to_s
+            rSinkString << ", TimeStampsOfLastAppliedOp=" + 
+                           
(java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp())).toString()
+            rLoadSourceList = sl.getReplicationLoadSourceList()
+            index = 0
+            while index < rLoadSourceList.size()
+              rLoadSource = rLoadSourceList.get(index)
+              rSourceString << " PeerID=" + rLoadSource.getPeerID()
+              rSourceString << ", AgeOfLastShippedOp=" + 
rLoadSource.getAgeOfLastShippedOp().to_s
+              rSourceString << ", SizeOfLogQueue=" + 
rLoadSource.getSizeOfLogQueue().to_s
+              rSourceString << ", TimeStampsOfLastShippedOp=" + 
+                             
(java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp())).toString()
+              rSourceString << ", Replication Lag=" + 
rLoadSource.getReplicationLag().to_s
+              index = index + 1
+            end
+            puts("    %s:" %
+            [ server.getHostname() ])
+            if type.casecmp("SOURCE") == 0
+              puts("%s" % rSourceString)
+            elsif type.casecmp("SINK") == 0
+              puts("%s" % rSinkString)
+            else
+              puts("%s" % rSourceString)
+              puts("%s" % rSinkString)
+            end
+          end
+        end
       elsif format == "simple"
         load = 0
         regions = 0

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-shell/src/main/ruby/shell/commands/status.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/status.rb 
b/hbase-shell/src/main/ruby/shell/commands/status.rb
index f72c13c..b22b272 100644
--- a/hbase-shell/src/main/ruby/shell/commands/status.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/status.rb
@@ -22,18 +22,21 @@ module Shell
     class Status < Command
       def help
         return <<-EOF
-Show cluster status. Can be 'summary', 'simple', or 'detailed'. The
+Show cluster status. Can be 'summary', 'simple', 'detailed', or 'replication'. 
The
 default is 'summary'. Examples:
 
   hbase> status
   hbase> status 'simple'
   hbase> status 'summary'
   hbase> status 'detailed'
+  hbase> status 'replication'
+  hbase> status 'replication', 'source'
+  hbase> status 'replication', 'sink'
 EOF
       end
 
-      def command(format = 'summary')
-        admin.status(format)
+      def command(format = 'summary',type = 'both')
+        admin.status(format, type)
       end
     end
   end

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-shell/src/test/ruby/hbase/admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb 
b/hbase-shell/src/test/ruby/hbase/admin_test.rb
index 0b12df9..50c7235 100644
--- a/hbase-shell/src/test/ruby/hbase/admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb
@@ -343,5 +343,17 @@ module Hbase
       table = table(@test_name)
       assert_not_equal(nil, table)
     end
+
+    define_test "Get replication status" do
+      replication_status("replication", "both")
+    end
+
+    define_test "Get replication source metrics information" do
+      replication_status("replication", "source")
+    end
+
+    define_test "Get replication sink metrics information" do
+      replication_status("replication", "sink")
+    end
   end
 end

http://git-wip-us.apache.org/repos/asf/hbase/blob/c391dfbd/hbase-shell/src/test/ruby/test_helper.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/test_helper.rb 
b/hbase-shell/src/test/ruby/test_helper.rb
index 0540a57..a776c23 100644
--- a/hbase-shell/src/test/ruby/test_helper.rb
+++ b/hbase-shell/src/test/ruby/test_helper.rb
@@ -90,6 +90,10 @@ module Hbase
         puts "IGNORING DROP TABLE ERROR: #{e}"
       end
     end
+
+    def replication_status(format,type)
+      return admin.status(format,type)
+    end
   end
 end
 

Reply via email to