Repository: hbase Updated Branches: refs/heads/branch-1.1 9b03a6b33 -> b437a61db
HBASE-16870 Add the metrics of replication sources which were transformed from other dead rs to ReplicationLoad Signed-off-by: zhangduo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b437a61d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b437a61d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b437a61d Branch: refs/heads/branch-1.1 Commit: b437a61db3e7d9e82a56df3d8fc20a8f4ed55cc8 Parents: 9b03a6b Author: Guanghao Zhang <[email protected]> Authored: Thu Oct 20 10:37:31 2016 +0800 Committer: zhangduo <[email protected]> Committed: Sat Oct 22 14:22:47 2016 +0800 ---------------------------------------------------------------------- .../replication/regionserver/MetricsSource.java | 2 +- .../replication/regionserver/Replication.java | 13 ++- .../regionserver/ReplicationLoad.java | 26 ++++- .../regionserver/ReplicationSourceManager.java | 6 ++ .../hbase/replication/TestReplicationBase.java | 3 +- .../replication/TestReplicationSmallTests.java | 42 --------- .../replication/TestReplicationStatus.java | 99 ++++++++++++++++++++ 7 files changed, 141 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b437a61d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 8cd0d60..939d48c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -62,7 +62,7 @@ public class MetricsSource { public void setAgeOfLastShippedOp(long timestamp) { long age = EnvironmentEdgeManager.currentTime() - timestamp; singleSourceSource.setLastShippedAge(age); - globalSourceSource.setLastShippedAge(age); + globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge())); this.lastTimestamp = timestamp; } http://git-wip-us.apache.org/repos/asf/hbase/blob/b437a61d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 78bb92e..6d261dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -327,15 +327,24 @@ public class Replication extends WALActionsListener.Base implements } private void buildReplicationLoad() { - // get source - List<ReplicationSourceInterface> sources = this.replicationManager.getSources(); List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>(); + // get source + List<ReplicationSourceInterface> sources = this.replicationManager.getSources(); for (ReplicationSourceInterface source : sources) { if (source instanceof ReplicationSource) { sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); } } + + // get old source + List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources(); + for (ReplicationSourceInterface source : oldSources) { + if (source instanceof ReplicationSource) { + sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); + } + } + // get sink MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); http://git-wip-us.apache.org/repos/asf/hbase/blob/b437a61d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 8dd42bc..2ead3df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.ArrayList; +import java.util.Map; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; @@ -66,8 +68,14 @@ public class ReplicationLoad { this.replicationLoadSink = rLoadSinkBuild.build(); // build the SourceLoad List - this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>(); + Map<String, ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceMap = + new HashMap<String, ClusterStatusProtos.ReplicationLoadSource>(); for (MetricsSource sm : this.sourceMetricsList) { + // Get the actual peer id + String peerId = sm.getPeerID(); + String[] parts = peerId.split("-", 2); + peerId = parts.length != 1 ? parts[0] : peerId; + long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp(); @@ -85,17 +93,27 @@ public class ReplicationLoad { replicationLag = 0; } + ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); + if (rLoadSource != null) { + ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp); + sizeOfLogQueue += rLoadSource.getSizeOfLogQueue(); + timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(), + timeStampOfLastShippedOp); + replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag); + } + ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.newBuilder(); - rLoadSourceBuild.setPeerID(sm.getPeerID()); + rLoadSourceBuild.setPeerID(peerId); rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp); rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); rLoadSourceBuild.setReplicationLag(replicationLag); - this.replicationLoadSourceList.add(rLoadSourceBuild.build()); + replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); } - + this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>( + replicationLoadSourceMap.values()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/b437a61d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 251b5e1..9062b87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -459,6 +459,9 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void closeRecoveredQueue(ReplicationSourceInterface src) { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); + if (src instanceof ReplicationSource) { + ((ReplicationSource) src).getSourceMetrics().clear(); + } this.oldsources.remove(src); deleteSource(src.getPeerClusterZnode(), false); this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); @@ -505,6 +508,9 @@ public class ReplicationSourceManager implements ReplicationListener { } for (ReplicationSourceInterface toRemove : srcToRemove) { toRemove.terminate(terminateMessage); + if (toRemove instanceof ReplicationSource) { + ((ReplicationSource) toRemove).getSourceMetrics().clear(); + } this.sources.remove(toRemove); } deleteSource(id, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/b437a61d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index ad9b227..f8053ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -66,6 +66,7 @@ public class TestReplicationBase { protected static HBaseTestingUtility utility1; protected static HBaseTestingUtility utility2; + protected static final String PEER_ID = "2"; protected static final int NB_ROWS_IN_BATCH = 100; protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10; @@ -121,7 +122,7 @@ public class TestReplicationBase { utility2.setZkCluster(miniZK); zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); - admin.addPeer("2", utility2.getClusterKey()); + admin.addPeer(PEER_ID, utility2.getClusterKey()); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); http://git-wip-us.apache.org/repos/asf/hbase/blob/b437a61d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 06f78e3..f14821a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -559,46 +559,4 @@ public class TestReplicationSmallTests extends TestReplicationBase { hadmin.close(); } - - /** - * Test for HBASE-9531 - * put a few rows into htable1, which should be replicated to htable2 - * create a ClusterStatus instance 'status' from HBaseAdmin - * test : status.getLoad(server).getReplicationLoadSourceList() - * test : status.getLoad(server).getReplicationLoadSink() - * * @throws Exception - */ - @Test(timeout = 300000) - public void testReplicationStatus() throws Exception { - LOG.info("testReplicationStatus"); - - try (Admin admin = utility1.getConnection().getAdmin()) { - - final byte[] qualName = Bytes.toBytes("q"); - Put p; - - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p = new Put(Bytes.toBytes("row" + i)); - p.add(famName, qualName, Bytes.toBytes("val" + i)); - htable1.put(p); - } - - ClusterStatus status = admin.getClusterStatus(); - - for (ServerName server : status.getServers()) { - ServerLoad sl = status.getLoad(server); - List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList(); - ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink(); - - // check SourceList has at least one entry - assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0)); - - // check Sink exist only as it is difficult to verify the value on the fly - assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ", - (rLoadSink.getAgeOfLastAppliedOp() >= 0)); - assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ", - (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0)); - } - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b437a61d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java new file mode 100644 index 0000000..c5de3ed --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestReplicationStatus extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestReplicationStatus.class); + + /** + * Test for HBASE-9531 + * put a few rows into htable1, which should be replicated to htable2 + * create a ClusterStatus instance 'status' from HBaseAdmin + * test : status.getLoad(server).getReplicationLoadSourceList() + * test : status.getLoad(server).getReplicationLoadSink() + * * @throws Exception + */ + @Test(timeout = 300000) + public void testReplicationStatus() throws Exception { + LOG.info("testReplicationStatus"); + + try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + // disable peer + admin.disablePeer(PEER_ID); + + final byte[] qualName = Bytes.toBytes("q"); + Put p; + + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + htable1.put(p); + } + + ClusterStatus status = hbaseAdmin.getClusterStatus(); + + for (ServerName server : status.getServers()) { + ServerLoad sl = status.getLoad(server); + List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList(); + ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink(); + + // check SourceList only has one entry, beacuse only has one peer + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); + + // check Sink exist only as it is difficult to verify the value on the fly + assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ", + (rLoadSink.getAgeOfLastAppliedOp() >= 0)); + assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ", + (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0)); + } + + // Stop rs1, then the queue of rs1 will be transfered to rs0 + utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer"); + Thread.sleep(5000); + status = hbaseAdmin.getClusterStatus(); + ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + ServerLoad sl = status.getLoad(server); + List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList(); + // check SourceList still only has one entry + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); + } finally { + admin.enablePeer(PEER_ID); + utility1.getHBaseCluster().getRegionServer(1).start(); + } + } +}
