Repository: hbase Updated Branches: refs/heads/0.98 0ffb197a6 -> c391dfbd7 refs/heads/branch-1 7841bf73b -> f5b40200d refs/heads/branch-1.0 bdb938ba2 -> c0e26c3a0
http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-protocol/src/main/protobuf/ClusterStatus.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto index 7e78395..17f79fe 100644 --- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto @@ -117,6 +117,19 @@ message RegionLoad { /* Server-level protobufs */ +message ReplicationLoadSink { + required uint64 ageOfLastAppliedOp = 1; + required uint64 timeStampsOfLastAppliedOp = 2; +} + +message ReplicationLoadSource { + required string peerID = 1; + required uint64 ageOfLastShippedOp = 2; + required uint32 sizeOfLogQueue = 3; + required uint64 timeStampOfLastShippedOp = 4; + required uint64 replicationLag = 5; +} + message ServerLoad { /** Number of requests since last report. */ optional uint32 number_of_requests = 1; @@ -158,6 +171,16 @@ message ServerLoad { * The port number that this region server is hosing an info server on. */ optional uint32 info_server_port = 9; + + /** + * The replicationLoadSource for the replication Source status of this region server. + */ + repeated ReplicationLoadSource replLoadSource = 10; + + /** + * The replicationLoadSink for the replication Sink status of this region server. + */ + optional ReplicationLoadSink replLoadSink = 11; } message LiveServerInfo { http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d03e809..4e6465d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Addressing; @@ -1120,6 +1121,22 @@ public class HRegionServer extends HasThread implements } else { serverLoad.setInfoServerPort(-1); } + + // for the replicationLoad purpose. Only need to get from one service + // either source or sink will get the same info + ReplicationSourceService rsources = getReplicationSourceService(); + + if (rsources != null) { + // always refresh first to get the latest value + ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); + if (rLoad != null) { + serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); + for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) { + serverLoad.addReplLoadSource(rLS); + } + } + } + return serverLoad.build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index 92ac823..25a27a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -22,11 +22,12 @@ import java.io.IOException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** - * Gateway to Cluster Replication. + * Gateway to Cluster Replication. * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. * One such application is a cross-datacenter * replication service that can keep two hbase clusters in sync. @@ -52,4 +53,9 @@ public interface ReplicationService { * Stops replication service. */ void stopReplicationService(); + + /** + * Refresh and Get ReplicationLoad + */ + public ReplicationLoad refreshAndGetReplicationLoad(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index 0c9d016..37dc1dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -71,4 +71,21 @@ public class MetricsSink { mss.incrAppliedOps(batchSize); } + /** + * Get the Age of Last Applied Op + * @return ageOfLastAppliedOp + */ + public long getAgeOfLastAppliedOp() { + return mss.getLastAppliedOpAge(); + } + + /** + * Get the TimeStampOfLastAppliedOp. If no replication Op applied yet, the value is the timestamp + * at which hbase instance starts + * @return timeStampsOfLastAppliedOp; + */ + public long getTimeStampOfLastAppliedOp() { + return this.lastTimestampForAge; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index a734b9c..21296a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -36,6 +36,7 @@ public class MetricsSource { private long lastTimestamp = 0; private int lastQueueSize = 0; + private String id; private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; @@ -46,6 +47,7 @@ public class MetricsSource { * @param id Name of the source this class is monitoring */ public MetricsSource(String id) { + this.id = id; singleSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) .getSource(id); @@ -143,4 +145,36 @@ public class MetricsSource { globalSourceSource.decrSizeOfLogQueue(lastQueueSize); lastQueueSize = 0; } + + /** + * Get AgeOfLastShippedOp + * @return AgeOfLastShippedOp + */ + public Long getAgeOfLastShippedOp() { + return singleSourceSource.getLastShippedAge(); + } + + /** + * Get the sizeOfLogQueue + * @return sizeOfLogQueue + */ + public int getSizeOfLogQueue() { + return this.lastQueueSize; + } + + /** + * Get the timeStampsOfLastShippedOp + * @return lastTimestampForAge + */ + public long getTimeStampOfLastShippedOp() { + return lastTimestamp; + } + + /** + * Get the slave peer ID + * @return peerID + */ + public String getPeerID() { + return id; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 4729644..78bb92e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; @@ -66,7 +67,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ @InterfaceAudience.Private -public class Replication extends WALActionsListener.Base implements +public class Replication extends WALActionsListener.Base implements ReplicationSourceService, ReplicationSinkService { private static final Log LOG = LogFactory.getLog(Replication.class); @@ -82,6 +83,8 @@ public class Replication extends WALActionsListener.Base implements /** Statistics thread schedule pool */ private ScheduledExecutorService scheduleThreadPool; private int statsThreadPeriod; + // ReplicationLoad to access replication metrics + private ReplicationLoad replicationLoad; /** * Instantiate the replication management (if rep is enabled). @@ -138,11 +141,13 @@ public class Replication extends WALActionsListener.Base implements this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); + this.replicationLoad = new ReplicationLoad(); } else { this.replicationManager = null; this.replicationQueues = null; this.replicationPeers = null; this.replicationTracker = null; + this.replicationLoad = null; } } @@ -310,4 +315,29 @@ public class Replication extends WALActionsListener.Base implements } } } + + @Override + public ReplicationLoad refreshAndGetReplicationLoad() { + if (this.replicationLoad == null) { + return null; + } + // always build for latest data + buildReplicationLoad(); + return this.replicationLoad; + } + + private void buildReplicationLoad() { + // get source + List<ReplicationSourceInterface> sources = this.replicationManager.getSources(); + List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>(); + + for (ReplicationSourceInterface source : sources) { + if (source instanceof ReplicationSource) { + sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); + } + } + // get sink + MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); + this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java new file mode 100644 index 0000000..b3f3ecb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -0,0 +1,151 @@ +/** + * Copyright 2014 The Apache Software Foundation Licensed to the Apache Software Foundation (ASF) + * under one or more contributor license agreements. See the NOTICE file distributed with this work + * for additional information regarding copyright ownership. The ASF licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in + * writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific + * language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Date; +import java.util.List; +import java.util.ArrayList; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Strings; + +/** + * This class is used for exporting some of the info from replication metrics + */ [email protected] +public class ReplicationLoad { + + // Empty load instance. + public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad(); + + private List<MetricsSource> sourceMetricsList; + private MetricsSink sinkMetrics; + + private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceList; + private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink; + + /** default constructor */ + public ReplicationLoad() { + super(); + } + + /** + * buildReplicationLoad + * @param srMetricsList + * @param skMetrics + */ + + public void buildReplicationLoad(final List<MetricsSource> srMetricsList, + final MetricsSink skMetrics) { + this.sourceMetricsList = srMetricsList; + this.sinkMetrics = skMetrics; + + // build the SinkLoad + ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild = + ClusterStatusProtos.ReplicationLoadSink.newBuilder(); + rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp()); + rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimeStampOfLastAppliedOp()); + this.replicationLoadSink = rLoadSinkBuild.build(); + + // build the SourceLoad List + this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>(); + for (MetricsSource sm : this.sourceMetricsList) { + long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); + int sizeOfLogQueue = sm.getSizeOfLogQueue(); + long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp(); + long replicationLag; + long timePassedAfterLastShippedOp = + EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; + if (sizeOfLogQueue != 0) { + // err on the large side + replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); + } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { + replicationLag = ageOfLastShippedOp; // last shipped happen recently + } else { + // last shipped may happen last night, + // so NO real lag although ageOfLastShippedOp is non-zero + replicationLag = 0; + } + + ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = + ClusterStatusProtos.ReplicationLoadSource.newBuilder(); + rLoadSourceBuild.setPeerID(sm.getPeerID()); + rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp); + rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); + rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); + rLoadSourceBuild.setReplicationLag(replicationLag); + + this.replicationLoadSourceList.add(rLoadSourceBuild.build()); + } + + } + + /** + * sourceToString + * @return a string contains sourceReplicationLoad information + */ + public String sourceToString() { + if (this.sourceMetricsList == null) return null; + + StringBuilder sb = new StringBuilder(); + + for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList) { + + sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID()); + sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp()); + sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue()); + sb = + Strings.appendKeyValue(sb, "TimeStampsOfLastShippedOp", + (new Date(rls.getTimeStampOfLastShippedOp()).toString())); + sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag()); + } + + return sb.toString(); + } + + /** + * sinkToString + * @return a string contains sinkReplicationLoad information + */ + public String sinkToString() { + if (this.replicationLoadSink == null) return null; + + StringBuilder sb = new StringBuilder(); + sb = + Strings.appendKeyValue(sb, "AgeOfLastAppliedOp", + this.replicationLoadSink.getAgeOfLastAppliedOp()); + sb = + Strings.appendKeyValue(sb, "TimeStampsOfLastAppliedOp", + (new Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString())); + + return sb.toString(); + } + + public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() { + return this.replicationLoadSink; + } + + public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceList() { + return this.replicationLoadSourceList; + } + + /** + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 9a60131..3276418 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -254,4 +254,12 @@ public class ReplicationSink { "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + ", total replicated edits: " + this.totalReplicatedEdits; } + + /** + * Get replication Sink Metrics + * @return MetricsSink + */ + public MetricsSink getSinkMetrics() { + return this.metrics; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 6e2ef2d..ccde5c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -868,4 +868,12 @@ public class ReplicationSource extends Thread ", currently replicating from: " + this.currentPath + " at position: " + position; } + + /** + * Get Replication Source Metrics + * @return sourceMetrics + */ + public MetricsSource getSourceMetrics() { + return this.metrics; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 4163b66..d8d735f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -31,13 +31,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -48,6 +51,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.Replication; @@ -556,4 +560,45 @@ public class TestReplicationSmallTests extends TestReplicationBase { hadmin.close(); } + /** + * Test for HBASE-9531 + * put a few rows into htable1, which should be replicated to htable2 + * create a ClusterStatus instance 'status' from HBaseAdmin + * test : status.getLoad(server).getReplicationLoadSourceList() + * test : status.getLoad(server).getReplicationLoadSink() + * * @throws Exception + */ + @Test(timeout = 300000) + public void testReplicationStatus() throws Exception { + LOG.info("testReplicationStatus"); + + try (Admin admin = utility1.getConnection().getAdmin()) { + + final byte[] qualName = Bytes.toBytes("q"); + Put p; + + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + htable1.put(p); + } + + ClusterStatus status = admin.getClusterStatus(); + + for (ServerName server : status.getServers()) { + ServerLoad sl = status.getLoad(server); + List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList(); + ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink(); + + // check SourceList has at least one entry + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0)); + + // check Sink exist only as it is difficult to verify the value on the fly + assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ", + (rLoadSink.getAgeOfLastAppliedOp() >= 0)); + assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ", + (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0)); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-shell/src/main/ruby/hbase/admin.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index c0ea862..35ee36c 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -608,7 +608,7 @@ module Hbase end end - def status(format) + def status(format, type) status = @admin.getClusterStatus() if format == "detailed" puts("version %s" % [ status.getHBaseVersion() ]) @@ -635,6 +635,46 @@ module Hbase for server in status.getDeadServerNames() puts(" %s" % [ server ]) end + elsif format == "replication" + #check whether replication is enabled or not + if ([email protected]().getBoolean(org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_KEY, + org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_DEFAULT)) + puts("Please enable replication first.") + else + puts("version %s" % [ status.getHBaseVersion() ]) + puts("%d live servers" % [ status.getServersSize() ]) + for server in status.getServers() + sl = status.getLoad(server) + rSinkString = " SINK :" + rSourceString = " SOURCE:" + rLoadSink = sl.getReplicationLoadSink() + rSinkString << " AgeOfLastAppliedOp=" + rLoadSink.getAgeOfLastAppliedOp().to_s + rSinkString << ", TimeStampsOfLastAppliedOp=" + + (java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp())).toString() + rLoadSourceList = sl.getReplicationLoadSourceList() + index = 0 + while index < rLoadSourceList.size() + rLoadSource = rLoadSourceList.get(index) + rSourceString << " PeerID=" + rLoadSource.getPeerID() + rSourceString << ", AgeOfLastShippedOp=" + rLoadSource.getAgeOfLastShippedOp().to_s + rSourceString << ", SizeOfLogQueue=" + rLoadSource.getSizeOfLogQueue().to_s + rSourceString << ", TimeStampsOfLastShippedOp=" + + (java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp())).toString() + rSourceString << ", Replication Lag=" + rLoadSource.getReplicationLag().to_s + index = index + 1 + end + puts(" %s:" % + [ server.getHostname() ]) + if type.casecmp("SOURCE") == 0 + puts("%s" % rSourceString) + elsif type.casecmp("SINK") == 0 + puts("%s" % rSinkString) + else + puts("%s" % rSourceString) + puts("%s" % rSinkString) + end + end + end elsif format == "simple" load = 0 regions = 0 http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-shell/src/main/ruby/shell/commands/status.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/status.rb b/hbase-shell/src/main/ruby/shell/commands/status.rb index f72c13c..b22b272 100644 --- a/hbase-shell/src/main/ruby/shell/commands/status.rb +++ b/hbase-shell/src/main/ruby/shell/commands/status.rb @@ -22,18 +22,21 @@ module Shell class Status < Command def help return <<-EOF -Show cluster status. Can be 'summary', 'simple', or 'detailed'. The +Show cluster status. Can be 'summary', 'simple', 'detailed', or 'replication'. The default is 'summary'. Examples: hbase> status hbase> status 'simple' hbase> status 'summary' hbase> status 'detailed' + hbase> status 'replication' + hbase> status 'replication', 'source' + hbase> status 'replication', 'sink' EOF end - def command(format = 'summary') - admin.status(format) + def command(format = 'summary',type = 'both') + admin.status(format, type) end end end http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-shell/src/test/ruby/hbase/admin_test.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb index caede3a..1925864 100644 --- a/hbase-shell/src/test/ruby/hbase/admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb @@ -356,5 +356,17 @@ module Hbase assert_not_equal(nil, table) table.close end + + define_test "Get replication status" do + replication_status("replication", "both") + end + + define_test "Get replication source metrics information" do + replication_status("replication", "source") + end + + define_test "Get replication sink metrics information" do + replication_status("replication", "sink") + end end end http://git-wip-us.apache.org/repos/asf/hbase/blob/c0e26c3a/hbase-shell/src/test/ruby/test_helper.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/test/ruby/test_helper.rb b/hbase-shell/src/test/ruby/test_helper.rb index 5579761..5dfafc5 100644 --- a/hbase-shell/src/test/ruby/test_helper.rb +++ b/hbase-shell/src/test/ruby/test_helper.rb @@ -94,6 +94,10 @@ module Hbase puts "IGNORING DROP TABLE ERROR: #{e}" end end + + def replication_status(format,type) + return admin.status(format,type) + end end end
