This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 1758f2d Fix unstable test TestZkCallbackHandlerLeak. (#1734)
1758f2d is described below
commit 1758f2d3b40270205efd54eacc24f0ed8c13405a
Author: Jiajun Wang <[email protected]>
AuthorDate: Wed May 12 10:26:45 2021 -0700
Fix unstable test TestZkCallbackHandlerLeak. (#1734)
Extend the wait and check section to wait for the expected condition with
more tolerance.
---
.../integration/TestZkCallbackHandlerLeak.java | 195 ++++++---------------
1 file changed, 54 insertions(+), 141 deletions(-)
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 7bc92dc..d9f8707 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -25,9 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.AccessOption;
-import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
@@ -43,7 +41,6 @@ import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.tools.ClusterStateVerifier;
import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -58,7 +55,6 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase
{
@Test
public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
@@ -106,39 +102,26 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
final MockParticipantManager participantManagerToExpire = participants[1];
// check controller zk-watchers
- boolean result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
- // Set<String> watchPaths = watchers.get("0x" +
controllerManager.getSessionId());
- Set<String> watchPaths = watchers.get("0x" +
controller.getSessionId());
- // System.out.println("controller watch paths: " + watchPaths);
+ boolean result = TestHelper.verify(() -> {
+ Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
+ Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
- // where n is number of nodes and r is number of resources
- return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * n);
- }
+ // where n is number of nodes and r is number of resources
+ return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * n);
}, 2000);
Assert.assertTrue(result, "Controller has incorrect number of
zk-watchers.");
// check participant zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
- Set<String> watchPaths = watchers.get("0x" +
participantManagerToExpire.getSessionId());
- // System.out.println("participant watch paths: " + watchPaths);
+ result = TestHelper.verify(() -> {
+ Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
+ Set<String> watchPaths = watchers.get("0x" +
participantManagerToExpire.getSessionId());
- // participant should have 1 zk-watcher: 1 for MESSAGE
- return watchPaths.size() == 1;
- }
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}, 2000);
Assert.assertTrue(result, "Participant should have 1 zk-watcher.
MESSAGES->HelixTaskExecutor");
// check HelixManager#_handlers
- // printHandlers(controllerManager);
- // printHandlers(participantManagerToExpire);
int controllerHandlerNb = controller.getHandlers().size();
int particHandlerNb = participantManagerToExpire.getHandlers().size();
Assert.assertEquals(controllerHandlerNb, 8 + 4 * n,
@@ -162,39 +145,27 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
Assert.assertTrue(result);
// check controller zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
- Set<String> watchPaths = watchers.get("0x" +
controller.getSessionId());
- // System.out.println("controller watch paths after session expiry: "
+ watchPaths);
+ result = TestHelper.verify(() -> {
+ Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
+ Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
- // where n is number of nodes and r is number of resources
- // one participant is disconnected, and its task current states are
removed
- return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * (n
- 1) + 6 + r);
- }
+ // where n is number of nodes and r is number of resources
+ // one participant is disconnected, and its task current states are
removed
+ return watchPaths.size() == (8 + r + (6 + r + taskResourceCount) * (n -
1) + 6 + r);
}, 2000);
Assert.assertTrue(result, "Controller has incorrect number of zk-watchers
after session expiry.");
// check participant zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
- Set<String> watchPaths = watchers.get("0x" +
participantManagerToExpire.getSessionId());
- // System.out.println("participant watch paths after session expiry: "
+ watchPaths);
+ result = TestHelper.verify(() -> {
+ Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
+ Set<String> watchPaths = watchers.get("0x" +
participantManagerToExpire.getSessionId());
- // participant should have 1 zk-watcher: 1 for MESSAGE
- return watchPaths.size() == 1;
- }
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}, 2000);
Assert.assertTrue(result, "Participant should have 1 zk-watcher after
session expiry.");
// check handlers
- // printHandlers(controllerManager);
- // printHandlers(participantManagerToExpire);
int handlerNb = controller.getHandlers().size();
Assert.assertEquals(handlerNb, controllerHandlerNb,
"controller callback handlers should not increase after participant
session expiry");
@@ -214,7 +185,6 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
@Test
public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
@@ -261,17 +231,13 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
final MockParticipantManager participantManager = participants[0];
// wait until we get all the listeners registered
- TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- int controllerHandlerNb = controller.getHandlers().size();
- int particHandlerNb = participantManager.getHandlers().size();
- if (controllerHandlerNb == 10 && particHandlerNb == 2)
- return true;
- else
- return false;
- }
+ TestHelper.verify(() -> {
+ int controllerHandlerNb = controller.getHandlers().size();
+ int particHandlerNb = participantManager.getHandlers().size();
+ if (controllerHandlerNb == 10 && particHandlerNb == 2)
+ return true;
+ else
+ return false;
}, 1000);
int controllerHandlerNb = controller.getHandlers().size();
@@ -296,39 +262,29 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
Assert.assertTrue(verifier.verifyByPolling());
// check controller zk-watchers
- boolean result = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
- Set<String> watchPaths = watchers.get("0x" +
controller.getSessionId());
- System.err.println("controller watch paths after session expiry: " +
watchPaths.size());
-
- // where r is number of resources and n is number of nodes
- // task resource count does not attribute to ideal state watch paths
- int expected = (8 + r + (6 + r + taskResourceCount) * n);
- return watchPaths.size() == expected;
- }
+ boolean result = TestHelper.verify(() -> {
+ Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
+ Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
+ System.err.println("controller watch paths after session expiry: " +
watchPaths.size());
+
+ // where r is number of resources and n is number of nodes
+ // task resource count does not attribute to ideal state watch paths
+ int expected = (8 + r + (6 + r + taskResourceCount) * n);
+ return watchPaths.size() == expected;
}, 2000);
Assert.assertTrue(result, "Controller has incorrect zk-watchers after
session expiry.");
// check participant zk-watchers
- result = TestHelper.verify(new TestHelper.Verifier() {
+ result = TestHelper.verify(() -> {
+ Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
+ Set<String> watchPaths = watchers.get("0x" +
participantManager.getSessionId());
- @Override
- public boolean verify() throws Exception {
- Map<String, Set<String>> watchers =
ZkTestHelper.getListenersBySession(ZK_ADDR);
- Set<String> watchPaths = watchers.get("0x" +
participantManager.getSessionId());
- // System.out.println("participant watch paths after session expiry: "
+ watchPaths);
-
- // participant should have 1 zk-watcher: 1 for MESSAGE
- return watchPaths.size() == 1;
- }
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}, 2000);
Assert.assertTrue(result, "Participant should have 1 zk-watcher after
session expiry.");
// check HelixManager#_handlers
- // printHandlers(controllerManager);
int handlerNb = controller.getHandlers().size();
Assert.assertEquals(handlerNb, controllerHandlerNb,
"controller callback handlers should not increase after participant
session expiry, but was "
@@ -400,7 +356,6 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
MockParticipantManager participantToExpire = participants[0];
String oldSessionId = participantToExpire.getSessionId();
- PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
// expire participant session; leaked callback handler used to be not
reset() and be removed from ZkClient
LOG.info("Expire participant: " + participantToExpire.getInstanceName() +
", session: "
@@ -471,14 +426,13 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
rpManager.syncStart();
RoutingTableProvider rp = new RoutingTableProvider(rpManager,
PropertyType.CURRENTSTATES);
- LOG.info("add job");
MockParticipantManager jobParticipant = participants[0];
String jobSessionId = jobParticipant.getSessionId();
- HelixDataAccessor jobAccesor = jobParticipant.getHelixDataAccessor();
+ HelixDataAccessor jobAccessor = jobParticipant.getHelixDataAccessor();
PropertyKey.Builder jobKeyBuilder = new PropertyKey.Builder(clusterName);
PropertyKey db0key =
jobKeyBuilder.currentState(jobParticipant.getInstanceName(),
jobSessionId, "TestDB0");
- CurrentState db0 = jobAccesor.getProperty(db0key);
+ CurrentState db0 = jobAccessor.getProperty(db0key);
PropertyKey jobKey =
jobKeyBuilder.currentState(jobParticipant.getInstanceName(),
jobSessionId, "BackupQueue");
CurrentState cs = new CurrentState("BackupQueue");
@@ -490,16 +444,16 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
LOG.info("add job");
for (int i = 0; i < mJobUpdateCnt; i++) {
- jobAccesor.setProperty(jobKey, cs);
+ jobAccessor.setProperty(jobKey, cs);
}
// verify new watcher is installed on the new node
- boolean result = TestHelper.verify(() -> {
- return
ZkTestHelper.getListenersByZkPath(ZK_ADDR).keySet().contains(jobKey.getPath());
- }, TestHelper.WAIT_DURATION);
- Assert.assertTrue(result, "Should get initial clusterConfig callback
invoked");
- rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
-
Assert.assertTrue(rpWatchPaths.get("dataWatches").contains(jobKey.getPath()));
+ boolean result = TestHelper.verify(
+ () ->
ZkTestHelper.getListenersByZkPath(ZK_ADDR).keySet().contains(jobKey.getPath())
+ &&
ZkTestHelper.getZkWatch(rpManager.getZkClient()).get("dataWatches")
+ .contains(jobKey.getPath()), TestHelper.WAIT_DURATION);
+ Assert.assertTrue(result,
+ "Should get initial clusterConfig callback invoked and add data
watchers");
LOG.info("remove job");
jobParticipant.getZkClient().delete(jobKey.getPath());
@@ -519,7 +473,7 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
for (int i = 0; i < n; i++) {
participants[i].syncStop();
}
- // rp.shutdown();
+ rp.shutdown();
rpManager.syncStop();
TestHelper.dropCluster(clusterName, _gZkClient);
@@ -553,16 +507,9 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
// register a controller listener on participant_0
if (i == 0) {
- // ZkHelixTestManager manager = participants[0].getManager();
MockParticipantManager manager = participants[0];
- manager.addCurrentStateChangeListener(new CurrentStateChangeListener()
{
- @Override
- public void onStateChange(String instanceName, List<CurrentState>
statesInfo,
- NotificationContext changeContext) {
- // To change body of implemented methods use File | Settings |
File Templates.
- // System.out.println(instanceName + " on current-state change,
type: " +
- // changeContext.getType());
- }
+ manager.addCurrentStateChangeListener((instanceName1, statesInfo,
changeContext) -> {
+ // To change body of implemented methods use File | Settings | File
Templates.
}, manager.getInstanceName(), manager.getSessionId());
}
}
@@ -586,7 +533,6 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
Map<String, Set<IZkChildListener>> childListeners =
ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
- // printZkListeners(participantToExpire.getZkClient());
Assert.assertEquals(dataListeners.size(), 1,
"Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1
data-listeners");
String path =
@@ -609,7 +555,6 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
// check zookeeper#watches on client side
Map<String, List<String>> watchPaths =
ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
- // System.out.println("localhost_12918 zk-client side watchPaths: " +
watchPaths + "\n");
Assert
.assertEquals(watchPaths.get("dataWatches").size(), 3,
"Should have 3 data-watches: CURRENTSTATE/{sessionId},
CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
@@ -640,7 +585,6 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
// check zkclient#listeners
dataListeners =
ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
childListeners =
ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
- // printZkListeners(participantToExpire.getZkClient());
Assert.assertTrue(dataListeners.isEmpty(), "Should have no
data-listeners");
Assert
.assertEquals(
@@ -660,7 +604,6 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
- // System.out.println("localhost_12918 zk-client side watchPaths: " +
watchPaths + "\n");
Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
"Should have 1 data-watches: MESSAGES");
Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
@@ -694,7 +637,6 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
- // System.out.println("localhost_12918 zk-client side watchPaths: " +
watchPaths + "\n");
Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
"Should have 1 data-watches: MESSAGES");
Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
@@ -705,8 +647,6 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
0,
"Should have no exist-watches. exist-watches on
CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be
cleared during handleNewSession");
- // Thread.sleep(1000);
-
// clean up
controller.syncStop();
for (int i = 0; i < n; i++) {
@@ -736,31 +676,4 @@ public class TestZkCallbackHandlerLeak extends
ZkUnitTestBase {
return sb.toString();
}
-
- void printZkListeners(HelixZkClient client) throws Exception {
- Map<String, Set<IZkDataListener>> datalisteners =
ZkTestHelper.getZkDataListener(client);
- Map<String, Set<IZkChildListener>> childListeners =
ZkTestHelper.getZkChildListener(client);
-
- System.out.println("dataListeners {");
- for (String path : datalisteners.keySet()) {
- System.out.println("\t" + path + ": ");
- Set<IZkDataListener> set = datalisteners.get(path);
- for (IZkDataListener listener : set) {
- CallbackHandler handler = (CallbackHandler) listener;
- System.out.println("\t\t" + handler.getListener());
- }
- }
- System.out.println("}");
-
- System.out.println("childListeners {");
- for (String path : childListeners.keySet()) {
- System.out.println("\t" + path + ": ");
- Set<IZkChildListener> set = childListeners.get(path);
- for (IZkChildListener listener : set) {
- CallbackHandler handler = (CallbackHandler) listener;
- System.out.println("\t\t" + handler.getListener());
- }
- }
- System.out.println("}");
- }
}