This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new ff38218 HBASE-25627: HBase replication should have a metric to
represent if the source is stuck getting initialized (#3018)
ff38218 is described below
commit ff3821814aa79248ec82f1c07f9d624e7dfb2334
Author: Sandeep Pal <[email protected]>
AuthorDate: Wed Mar 17 09:10:44 2021 -0700
HBASE-25627: HBase replication should have a metric to represent if the
source is stuck getting initialized (#3018)
Introduces a new metric that tracks number of replication sources that are
stuck in initialization.
Signed-off-by: Xu Cang <[email protected]>
Signed-off-by: Bharath Vissapragada <[email protected]>
---
.../MetricsReplicationGlobalSourceSourceImpl.java | 20 +++++++++++-
.../MetricsReplicationSourceSource.java | 7 +++--
.../MetricsReplicationSourceSourceImpl.java | 21 +++++++++++++
.../metrics2/lib/DynamicMetricsRegistry.java | 36 ++++++++++++++++++++--
.../replication/regionserver/MetricsSource.java | 36 ++++++++++++++++++----
.../regionserver/ReplicationSource.java | 23 +++++++++-----
.../regionserver/TestReplicationSource.java | 33 +++++++++++++++++++-
7 files changed, 157 insertions(+), 19 deletions(-)
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index d4cc1e3..0cc9f9fd 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -19,13 +19,14 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class MetricsReplicationGlobalSourceSourceImpl
- implements MetricsReplicationGlobalSourceSource {
+ implements MetricsReplicationGlobalSourceSource {
private static final String KEY_PREFIX = "source.";
private final MetricsReplicationSourceImpl rms;
@@ -49,6 +50,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
private final MutableGaugeLong walReaderBufferUsageBytes;
+ private final MutableGaugeInt sourceInitializing;
public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl
rms) {
this.rms = rms;
@@ -89,6 +91,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
walReaderBufferUsageBytes = rms.getMetricsRegistry()
.getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
+ sourceInitializing =
rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
}
@Override public void setLastShippedAge(long age) {
@@ -199,6 +202,21 @@ public class MetricsReplicationGlobalSourceSourceImpl
}
@Override
+ public void incrSourceInitializing() {
+ sourceInitializing.incr(1);
+ }
+
+ @Override
+ public void decrSourceInitializing() {
+ sourceInitializing.decr(1);
+ }
+
+ @Override
+ public int getSourceInitializing() {
+ return sourceInitializing.value();
+ }
+
+ @Override
public void init() {
rms.init();
}
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index ac396af..490aec9 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -49,8 +49,8 @@ public interface MetricsReplicationSourceSource extends
BaseSource {
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES =
"source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES =
"source.failedRecoverQueues";
- /* Used to track the age of oldest wal in ms since its creation time */
- String OLDEST_WAL_AGE = "source.oldestWalAge";
+ // This is to track the num of replication sources getting initialized
+ public static final String SOURCE_INITIALIZING = "source.numInitializing";
void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
@@ -80,4 +80,7 @@ public interface MetricsReplicationSourceSource extends
BaseSource {
long getEditsFiltered();
void setOldestWalAge(long age);
long getOldestWalAge();
+ void incrSourceInitializing();
+ void decrSourceInitializing();
+ int getSourceInitializing();
}
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 2ceb77b..18d536a 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;
@@ -40,6 +41,7 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey;
private final String oldestWalAgeKey;
+ private final String sourceInitializingKey;
private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -67,6 +69,7 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableGaugeLong oldestWalAge;
+ private final MutableGaugeInt sourceInitializing;
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms,
String id) {
this.rms = rms;
@@ -126,6 +129,9 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
+
+ sourceInitializingKey = this.keyPrefix + "isInitializing";
+ sourceInitializing =
rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0);
}
@Override public void setLastShippedAge(long age) {
@@ -189,6 +195,7 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey);
rms.removeMetric(oldestWalAgeKey);
+ rms.removeMetric(sourceInitializingKey);
}
@Override
@@ -263,6 +270,20 @@ public class MetricsReplicationSourceSourceImpl implements
MetricsReplicationSou
}
@Override
+ public void incrSourceInitializing() {
+ sourceInitializing.incr(1);
+ }
+
+ @Override
+ public int getSourceInitializing() {
+ return sourceInitializing.value();
+ }
+
+ @Override public void decrSourceInitializing() {
+ sourceInitializing.decr(1);
+ }
+
+ @Override
public void init() {
rms.init();
}
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
index 7e17ee9..7a791c9 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.metrics2.lib;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsInfo;
@@ -30,7 +29,6 @@ import org.apache.hadoop.metrics2.impl.MsInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@@ -453,6 +451,40 @@ public class DynamicMetricsRegistry {
}
/**
+ * Get a MetricMutableGaugeInt from the storage. If it is not there
atomically put it.
+ *
+ * @param gaugeName name of the gauge to create or get.
+ * @param potentialStartingValue value of the new gauge if we have to create
it.
+ */
+ public MutableGaugeInt getGaugeInt(String gaugeName, int
potentialStartingValue) {
+ //Try and get the guage.
+ MutableMetric metric = metricsMap.get(gaugeName);
+
+ //If it's not there then try and put a new one in the storage.
+ if (metric == null) {
+ //Create the potential new gauge.
+ MutableGaugeInt newGauge = new MutableGaugeInt(new
MetricsInfoImpl(gaugeName, ""),
+ potentialStartingValue);
+
+ // Try and put the gauge in. This is atomic.
+ metric = metricsMap.putIfAbsent(gaugeName, newGauge);
+
+ //If the value we get back is null then the put was successful and we
will return that.
+ //otherwise gaugeInt should contain the thing that was in before the put
could be completed.
+ if (metric == null) {
+ return newGauge;
+ }
+ }
+
+ if (!(metric instanceof MutableGaugeInt)) {
+ throw new MetricsException("Metric already exists in registry for metric
name: " + gaugeName +
+ " and not of type MetricMutableGaugeInr");
+ }
+
+ return (MutableGaugeInt) metric;
+ }
+
+ /**
* Get a MetricMutableCounterLong from the storage. If it is not there
atomically put it.
*
* @param counterName Name of the counter to get
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 4ef98d7..a91963e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -21,16 +21,15 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.metrics.BaseSource;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* This class is for maintaining the various replication statistics for a
source and publishing them
@@ -62,7 +61,8 @@ public class MetricsSource implements BaseSource {
singleSourceSource =
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
.getSource(id);
- globalSourceSource =
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+ globalSourceSource = CompatibilitySingletonFactory
+ .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
singleSourceSourceByTable = new HashMap<>();
}
@@ -169,6 +169,22 @@ public class MetricsSource implements BaseSource {
}
/**
+ * Increment the count for initializing sources
+ */
+ public void incrSourceInitializing() {
+ singleSourceSource.incrSourceInitializing();
+ globalSourceSource.incrSourceInitializing();
+ }
+
+ /**
+ * Decrement the count for initializing sources
+ */
+ public void decrSourceInitializing() {
+ singleSourceSource.decrSourceInitializing();
+ globalSourceSource.decrSourceInitializing();
+ }
+
+ /**
* Add on the the number of log edits read
*
* @param delta the number of log edits read.
@@ -325,6 +341,14 @@ public class MetricsSource implements BaseSource {
}
/**
+ * Get the source initializing counts
+ * @return number of replication sources getting initialized
+ */
+ public int getSourceInitializing() {
+ return singleSourceSource.getSourceInitializing();
+ }
+
+ /**
* Get the slave peer ID
* @return peerID
*/
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f3fda67..9baf954 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -535,7 +535,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
sleepMultiplier++;
} else {
retryStartup.set(!this.abortOnError);
- this.startupOngoing.set(false);
+ setSourceStartupStatus(false);
throw new RuntimeException("Exhausted retries to start replication
endpoint.");
}
}
@@ -543,7 +543,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
if (!this.isSourceActive()) {
retryStartup.set(!this.abortOnError);
- this.startupOngoing.set(false);
+ setSourceStartupStatus(false);
throw new IllegalStateException("Source should be active.");
}
@@ -567,7 +567,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
if(!this.isSourceActive()) {
retryStartup.set(!this.abortOnError);
- this.startupOngoing.set(false);
+ setSourceStartupStatus(false);
throw new IllegalStateException("Source should be active.");
}
LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to
cluster={}",
@@ -578,7 +578,16 @@ public class ReplicationSource implements
ReplicationSourceInterface {
for (String walGroupId: logQueue.getQueues().keySet()) {
tryStartNewShipper(walGroupId);
}
- this.startupOngoing.set(false);
+ setSourceStartupStatus(false);
+ }
+
+ private synchronized void setSourceStartupStatus(boolean initializing) {
+ startupOngoing.set(initializing);
+ if (initializing) {
+ metrics.incrSourceInitializing();
+ } else {
+ metrics.decrSourceInitializing();
+ }
}
@Override
@@ -587,7 +596,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
return this;
}
this.sourceRunning = true;
- startupOngoing.set(true);
+ setSourceStartupStatus(true);
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
@@ -601,12 +610,12 @@ public class ReplicationSource implements
ReplicationSourceInterface {
do {
if(retryStartup.get()) {
this.sourceRunning = true;
- startupOngoing.set(true);
+ setSourceStartupStatus(true);
retryStartup.set(false);
try {
initialize();
} catch(Throwable error){
- sourceRunning = false;
+ setSourceStartupStatus(false);
uncaughtException(t, error, null, null);
retryStartup.set(!this.abortOnError);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 86a71c9..f5d4f77 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -455,7 +455,7 @@ public class TestReplicationSource {
@Override
public synchronized UUID getPeerUUID() {
- if(count==0) {
+ if (count==0) {
count++;
throw new RuntimeException();
} else {
@@ -465,6 +465,18 @@ public class TestReplicationSource {
}
+ /**
+ * Bad Endpoint with failing connection to peer on demand.
+ */
+ public static class BadReplicationEndpoint extends
DoNothingReplicationEndpoint {
+ static boolean failing = true;
+
+ @Override
+ public synchronized UUID getPeerUUID() {
+ return failing ? null : super.getPeerUUID();
+ }
+ }
+
public static class FaultyReplicationEndpoint extends
DoNothingReplicationEndpoint {
static int count = 0;
@@ -554,6 +566,25 @@ public class TestReplicationSource {
}
}
+ @Test
+ public void testReplicationSourceInitializingMetric() throws IOException {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setBoolean("replication.source.regionserver.abort", false);
+ ReplicationSource rs = new ReplicationSource();
+ RegionServerServices rss = setupForAbortTests(rs, conf,
+ BadReplicationEndpoint.class.getName());
+ try {
+ rs.startup();
+ assertTrue(rs.isSourceActive());
+ Waiter.waitFor(conf, 1000, () ->
rs.getSourceMetrics().getSourceInitializing() == 1);
+ BadReplicationEndpoint.failing = false;
+ Waiter.waitFor(conf, 1000, () ->
rs.getSourceMetrics().getSourceInitializing() == 0);
+ } finally {
+ rs.terminate("Done");
+ rss.stop("Done");
+ }
+ }
+
/**
* Test ReplicationSource keeps retrying startup indefinitely without
blocking the main thread,
* when <b>eplication.source.regionserver.abort</b> is set to false.