This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new d109c64 Fix the concurrent modification error happens during the
HelixManager initHandlers() call (#904)
d109c64 is described below
commit d109c64834b1b288c937b9f9693b26e74487a244
Author: Jiajun Wang <[email protected]>
AuthorDate: Fri Mar 20 18:18:16 2020 -0700
Fix the concurrent modification error happens during the HelixManager
initHandlers() call (#904)
Fix the concurrent modification error that happens during the HelixManager
initHandlers() call.
Add test case to verify the fix and ensure this error not happen again.
---
.../apache/helix/manager/zk/ZKHelixManager.java | 11 +--
...andleNewSession.java => TestHandleSession.java} | 94 ++++++++++++++++++++--
2 files changed, 92 insertions(+), 13 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index b4368f9..532ef44 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -1004,7 +1004,10 @@ public class ZKHelixManager implements HelixManager,
IZkStateListener {
void initHandlers(List<CallbackHandler> handlers) {
synchronized (this) {
if (handlers != null) {
- for (CallbackHandler handler : handlers) {
+ // get a copy of the list and iterate over the copy list
+ // in case handler.init() modifies the original handler list
+ List<CallbackHandler> tmpHandlers = new ArrayList<>(handlers);
+ for (CallbackHandler handler : tmpHandlers) {
handler.init();
LOG.info("init handler: " + handler.getPath() + ", " +
handler.getListener());
}
@@ -1016,10 +1019,8 @@ public class ZKHelixManager implements HelixManager,
IZkStateListener {
synchronized (this) {
if (_handlers != null) {
// get a copy of the list and iterate over the copy list
- // in case handler.reset() modify the original handler list
- List<CallbackHandler> tmpHandlers = new ArrayList<>();
- tmpHandlers.addAll(_handlers);
-
+ // in case handler.reset() modifies the original handler list
+ List<CallbackHandler> tmpHandlers = new ArrayList<>(_handlers);
for (CallbackHandler handler : tmpHandlers) {
handler.reset(isShutdown);
LOG.info("reset handler: " + handler.getPath() + ", " +
handler.getListener());
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
similarity index 84%
rename from
helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
rename to
helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
index 00bedcf..92eb3c4 100644
---
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
@@ -22,31 +22,37 @@ package org.apache.helix.manager.zk;
import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
+import org.apache.helix.api.listeners.CurrentStateChangeListener;
+import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.model.LiveInstance;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestHandleNewSession extends ZkTestBase {
+public class TestHandleSession extends ZkTestBase {
+ private static final String _className = TestHelper.getTestClassName();
+
@Test
public void testHandleNewSession() throws Exception {
- String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
+ String clusterName = _className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
@@ -191,9 +197,8 @@ public class TestHandleNewSession extends ZkTestBase {
*/
@Test(timeOut = 5 * 60 * 1000L)
public void testDiscardExpiredSessions() throws Exception {
- final String className = TestHelper.getTestClassName();
final String methodName = TestHelper.getTestMethodName();
- final String clusterName = className + "_" + methodName;
+ final String clusterName = _className + "_" + methodName;
final ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(ZK_ADDR));
@@ -314,9 +319,8 @@ public class TestHandleNewSession extends ZkTestBase {
*/
@Test
public void testSessionExpiredWhenResetHandlers() throws Exception {
- final String className = TestHelper.getTestClassName();
final String methodName = TestHelper.getTestMethodName();
- final String clusterName = className + "_" + methodName;
+ final String clusterName = _className + "_" + methodName;
final ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(ZK_ADDR));
@@ -466,6 +470,78 @@ public class TestHandleNewSession extends ZkTestBase {
deleteCluster(clusterName);
}
+ class MockLiveInstanceChangeListener implements LiveInstanceChangeListener {
+ private final HelixManager _manager;
+ private final Set<String> _expectedLiveInstances;
+
+ public MockLiveInstanceChangeListener(HelixManager manager,
+ Set<String> expectedLiveInstanceNames) {
+ _manager = manager;
+ _expectedLiveInstances = expectedLiveInstanceNames;
+ }
+
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+ NotificationContext changeContext) {
+ if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
+ for (LiveInstance liveInstance : liveInstances) {
+ if (_expectedLiveInstances.contains(liveInstance.getInstanceName()))
{
+ try {
+ _manager.addCurrentStateChangeListener(
+ (CurrentStateChangeListener) (instanceName, statesInfo,
currentStateChangeContext) -> {
+ // empty callback
+ }, liveInstance.getInstanceName(),
liveInstance.getEphemeralOwner());
+ } catch (Exception e) {
+ throw new HelixException("Unexpected exception in the test
method.", e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentInitCallbackHandlers() throws Exception {
+ final String clusterName =
+ CLUSTER_PREFIX + "_" + _className + "_" +
TestHelper.getTestMethodName();
+ TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+ final String spectatorName = TestHelper.getTestMethodName() + "Spectator";
+ try {
+ BlockingHandleNewSessionZkHelixManager helixManager =
+ new BlockingHandleNewSessionZkHelixManager(clusterName,
spectatorName,
+ InstanceType.SPECTATOR, _gZkClient.getServers());
+ helixManager.connect();
+ // Add two mock listeners that will add more callback handlers while
handling INIT or CALLBACK event.
+ // Note that we have to test with 2 separate listeners so one of them
has a chance to fail if
+ // there is a concurrent modification exception.
+ helixManager.addLiveInstanceChangeListener(
+ new MockLiveInstanceChangeListener(helixManager,
Collections.singleton("localhost_1")));
+ helixManager.addLiveInstanceChangeListener(
+ new MockLiveInstanceChangeListener(helixManager,
Collections.singleton("localhost_2")));
+
+ // Session expire will trigger all callbacks to be init. And the
injected liveInstance
+ // listener will trigger more callbackhandlers to be registered during
the init process.
+ ZkTestHelper.asyncExpireSession(helixManager.getZkClient());
+ // Create mock live instance znodes to trigger the internal callback
handling logic which will
+ // modify the handler list.
+ setupLiveInstances(clusterName, new int[] { 1, 2 });
+ // Start new session handling so the manager will call the initHandler()
for initializing all
+ // existing handlers.
+ helixManager.proceedNewSessionHandling();
+ // Ensure the new session has been processed.
+ TestHelper.verify(() -> helixManager.getHandleNewSessionEndTime() != 0,
3000);
+ // Verify that both new mock current state callback handlers have been
initialized normally.
+ // Note that if there is concurrent modification that cause errors, one
of the callback will
+ // not be initialized normally.
+ for (CallbackHandler handler : helixManager.getHandlers()) {
+ Assert.assertTrue(handler.isReady(),
+ "CallbackHandler is not initialized as expected. It might be
caused by a ConcurrentModificationException");
+ }
+ } finally {
+ TestHelper.dropCluster(clusterName, _gZkClient);
+ }
+ }
+
static class BlockingHandleNewSessionZkHelixManager extends ZKHelixManager {
private final Semaphore newSessionHandlingCount = new Semaphore(1);
private long handleNewSessionStartTime = 0L;
@@ -485,6 +561,8 @@ public class TestHandleNewSession extends ZkTestBase {
}
void proceedNewSessionHandling() {
+ handleNewSessionStartTime = 0L;
+ handleNewSessionEndTime = 0L;
newSessionHandlingCount.release();
}