This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new f2e68c8 fix TestRawZkClient unstableness (#1295)
f2e68c8 is described below
commit f2e68c83175de01bb5613e7742d498ccd1264ebc
Author: kaisun2000 <[email protected]>
AuthorDate: Thu Oct 1 17:28:28 2020 -0700
fix TestRawZkClient unstableness (#1295)
ZkClient connects to ZooKeeper before monitor bean initialization and
registration. This causes race condition that fails metrics test. Fix this
issue by properly construct the object.
Co-authored-by: Kai Sun <[email protected]>
---
.../apache/helix/zookeeper/zkclient/ZkClient.java | 29 ++++++++++-----
.../zookeeper/zkclient/metric/ZkClientMonitor.java | 43 +++++++++++++++++-----
.../zkclient/metric/ZkClientPathMonitor.java | 1 -
.../apache/helix/zookeeper/impl/TestHelper.java | 2 +-
.../zookeeper/impl/client/TestRawZkClient.java | 38 +++++++++++--------
5 files changed, 77 insertions(+), 36 deletions(-)
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 180fcdd..2f26f8b 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -222,20 +222,22 @@ public class ZkClient implements Watcher {
_asyncCallRetryThread.start();
LOG.debug("ZkClient created with uid {}, _asyncCallRetryThread id {}",
_uid, _asyncCallRetryThread.getId());
+ if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null &&
!monitorType
+ .isEmpty()) {
+ _monitor =
+ new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName,
monitorRootPathOnly,
+ _eventThread);
+ } else {
+ LOG.info("ZkClient monitor key or type is not provided. Skip
monitoring.");
+ }
+
connect(connectionTimeout, this);
- // initiate monitor
try {
- if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null
&& !monitorType
- .isEmpty()) {
- _monitor =
- new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName,
monitorRootPathOnly,
- _eventThread);
+ if (_monitor != null) {
_monitor.register();
- } else {
- LOG.info("ZkClient monitor key or type is not provided. Skip
monitoring.");
}
- } catch (JMException e) {
+ } catch (JMException e){
LOG.error("Error in creating ZkClientMonitor", e);
}
}
@@ -1284,6 +1286,7 @@ public class ZkClient implements Watcher {
});
}
+
/*
* Note, issueSync takes a ZooKeeper (client) object and pass it to
doAsyncSync().
* The reason we do this is that we want to ensure each new session event
is preceded with exactly
@@ -2157,6 +2160,14 @@ public class ZkClient implements Watcher {
IZkConnection zkConnection = getConnection();
_eventThread = new ZkEventThread(zkConnection.getServers());
+
+ if (_monitor != null) {
+ boolean result = _monitor.setAndInitZkEventThreadMonitor(_eventThread);
+ if (!result) {
+ LOG.error("register _eventThread monitor failed due to an existing
one");
+ }
+ }
+
_eventThread.start();
LOG.debug("ZkClient {}, _eventThread {}", _uid, _eventThread.getId());
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
index 6717472..6bfc747 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
@@ -38,9 +38,12 @@ import
org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
import org.apache.helix.monitoring.mbeans.exception.MetricException;
import org.apache.helix.zookeeper.zkclient.ZkEventThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZkClientMonitor extends DynamicMBeanProvider {
+
public static final String MONITOR_TYPE = "Type";
public static final String MONITOR_KEY = "Key";
protected static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client
Monitor";
@@ -83,8 +86,22 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
_expiredSessionCounter = new SimpleDynamicMetric("ExpiredSessionCounter",
0l);
_dataChangeEventCounter = new
SimpleDynamicMetric("DataChangeEventCounter", 0l);
_outstandingRequestGauge = new
SimpleDynamicMetric("OutstandingRequestGauge", 0l);
+
if (zkEventThread != null) {
- _zkEventThreadMetric = new ZkThreadMetric(zkEventThread);
+ boolean result = setAndInitZkEventThreadMonitor(zkEventThread);
+ if (!result) {
+ _logger.error("register zkEventThreadMonitor failed due to an existing
one.");
+ }
+ }
+
+ for (ZkClientPathMonitor.PredefinedPath path :
ZkClientPathMonitor.PredefinedPath.values()) {
+ // If monitor root path only, check if the current path is Root.
+ // Otherwise, add monitors for every path.
+ if (!_monitorRootOnly ||
path.equals(ZkClientPathMonitor.PredefinedPath.Root)) {
+ _zkClientPathMonitorMap.put(path,
+ new ZkClientPathMonitor(path, _monitorType, _monitorKey,
_monitorInstanceName)
+ );
+ }
}
}
@@ -96,6 +113,14 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
(monitorKey + (monitorInstanceName == null ? "" : "." +
monitorInstanceName)));
}
+ public synchronized boolean setAndInitZkEventThreadMonitor(ZkEventThread
zkEventThread) {
+ if (_zkEventThreadMetric == null) {
+ _zkEventThreadMetric = new ZkThreadMetric(zkEventThread);
+ return true;
+ }
+ return false;
+ }
+
@Override
public DynamicMBeanProvider register() throws JMException {
List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
@@ -108,15 +133,15 @@ public class ZkClientMonitor extends DynamicMBeanProvider
{
}
doRegister(attributeList, MBEAN_DESCRIPTION,
getObjectName(_monitorType, _monitorKey, _monitorInstanceName));
- for (ZkClientPathMonitor.PredefinedPath path :
ZkClientPathMonitor.PredefinedPath.values()) {
- // If monitor root path only, check if the current path is Root.
- // Otherwise, add monitors for every path.
- if (!_monitorRootOnly ||
path.equals(ZkClientPathMonitor.PredefinedPath.Root)) {
- _zkClientPathMonitorMap.put(path,
- new ZkClientPathMonitor(path, _monitorType, _monitorKey,
_monitorInstanceName)
- .register());
+ _zkClientPathMonitorMap.values().stream().forEach( monitor -> {
+ if (monitor != null) {
+ try {
+ monitor.register();
+ } catch (JMException e) {
+ _logger.error(" {} failed registration", monitor, e);
+ }
}
- }
+ });
return this;
}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
index db3de03..1ab2653 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
@@ -33,7 +33,6 @@ import
org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
-
public class ZkClientPathMonitor extends DynamicMBeanProvider {
public static final String MONITOR_PATH = "PATH";
private final String _sensorName;
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
index 9a6ac02..515da6d 100644
---
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
@@ -32,7 +32,7 @@ import org.testng.Assert;
public class TestHelper {
private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
- public static final long WAIT_DURATION = 20 * 1000L; // 20 seconds
+ public static final long WAIT_DURATION = 60 * 1000L; // 60 seconds
/**
* Returns a unused random port.
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index a21b714..535d760 100644
---
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -285,20 +285,26 @@ public class TestRawZkClient extends ZkTestBase {
Assert.assertTrue(beanServer.isRegistered(idealStatename));
Assert.assertEquals((long) beanServer.getAttribute(name,
"DataChangeEventCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(name,
"StateChangeEventCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(name,
"ExpiredSessionCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(name,
"OutstandingRequestGauge"), 0);
- // account for doAsyncSync()
- Assert.assertEquals((long) beanServer.getAttribute(name,
"TotalCallbackCounter"), 1);
- // Test exists
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadTotalLatencyCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadLatencyGauge.Max"), 0);
+ boolean verifyResult = TestHelper.verify(()->{
+ return (long) beanServer.getAttribute(rootname, "ReadCounter") == 1;
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(verifyResult, " did not see first sync() read");
+
+ Assert.assertEquals((long) beanServer.getAttribute(name,
"StateChangeEventCounter"), 1);
+
+ long firstLatencyCounter = (long) beanServer.getAttribute(rootname,
"ReadTotalLatencyCounter");
+ long firstReadLatencyGauge = (long) beanServer.getAttribute(rootname,
"ReadLatencyGauge.Max");
+ Assert.assertTrue(firstLatencyCounter >= 0);
+ Assert.assertTrue(firstReadLatencyGauge >= 0);
zkClient.exists(TEST_ROOT);
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 1);
- Assert.assertTrue((long) beanServer.getAttribute(rootname,
"ReadTotalLatencyCounter") >= 0);
- Assert.assertTrue((long) beanServer.getAttribute(rootname,
"ReadLatencyGauge.Max") >= 0);
+
+ Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadCounter")
== 2);
+
+ Assert.assertTrue((long) beanServer.getAttribute(rootname,
"ReadTotalLatencyCounter") >= firstLatencyCounter);
+ Assert.assertTrue((long) beanServer.getAttribute(rootname,
"ReadLatencyGauge.Max") >= firstReadLatencyGauge);
// Test create
Assert.assertEquals((long) beanServer.getAttribute(rootname,
"WriteCounter"), 0);
@@ -327,7 +333,7 @@ public class TestRawZkClient extends ZkTestBase {
Assert.assertTrue((long) beanServer.getAttribute(idealStatename,
"WriteLatencyGauge.Max") >= 0);
// Test read
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 1);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 2);
Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadBytesCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename,
"ReadCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename,
"ReadBytesCounter"), 0);
@@ -338,7 +344,7 @@ public class TestRawZkClient extends ZkTestBase {
Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename,
"ReadLatencyGauge.Max"), 0);
zkClient.readData(TEST_PATH, new Stat());
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 2);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 3);
Assert
.assertEquals((long) beanServer.getAttribute(rootname,
"ReadBytesCounter"), TEST_DATA_SIZE);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename,
"ReadCounter"), 1);
@@ -350,27 +356,27 @@ public class TestRawZkClient extends ZkTestBase {
>= origIdealStatesReadTotalLatencyCounter);
Assert.assertTrue((long) beanServer.getAttribute(idealStatename,
"ReadLatencyGauge.Max") >= 0);
zkClient.getChildren(TEST_PATH);
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 3);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 4);
Assert
.assertEquals((long) beanServer.getAttribute(rootname,
"ReadBytesCounter"), TEST_DATA_SIZE);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename,
"ReadCounter"), 2);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename,
"ReadBytesCounter"),
TEST_DATA_SIZE);
zkClient.getStat(TEST_PATH);
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 4);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 5);
Assert
.assertEquals((long) beanServer.getAttribute(rootname,
"ReadBytesCounter"), TEST_DATA_SIZE);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename,
"ReadCounter"), 3);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename,
"ReadBytesCounter"),
TEST_DATA_SIZE);
zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 5);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 6);
ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
new ZkAsyncCallbacks.ExistsCallbackHandler();
zkClient.asyncExists(TEST_PATH, callbackHandler);
callbackHandler.waitForSuccess();
- Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 6);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname,
"ReadCounter"), 7);
// Test write
zkClient.writeData(TEST_PATH, TEST_DATA);