Updated Branches: refs/heads/master 6952c8f00 -> f1ffa8619
[HELIX-364] Ensure participants and controllers sharing a session id are treated uniquely, rb=17256 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5cf39b96 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5cf39b96 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5cf39b96 Branch: refs/heads/master Commit: 5cf39b965cec3fc72c689d07ddb8642cde7f43a6 Parents: 7fca871 Author: Kanak Biscuitwala <[email protected]> Authored: Thu Jan 23 11:33:57 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Fri Jan 24 17:29:21 2014 -0800 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 11 +- .../stages/ExternalViewComputeStage.java | 13 +- .../manager/zk/DistributedLeaderElection.java | 64 +----- .../helix/manager/zk/ZkHelixLeaderElection.java | 21 +- .../DistClusterControllerElection.java | 74 +------ .../helix/integration/TestSchedulerMessage.java | 1 + .../helix/integration/TestSharedConnection.java | 199 +++++++++++++++++++ 7 files changed, 236 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 28a5b06..e9924a2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -43,6 +43,7 @@ import org.apache.helix.NotificationContext; import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; +import org.apache.helix.api.id.SessionId; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.PipelineRegistry; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; @@ -484,7 +485,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>(); for (LiveInstance liveInstance : liveInstances) { curInstances.put(liveInstance.getInstanceName(), liveInstance); - curSessions.put(liveInstance.getTypedSessionId().stringify(), liveInstance); + curSessions.put(liveInstance.getInstanceName() + "|" + liveInstance.getSessionId(), + liveInstance); } Map<String, LiveInstance> lastInstances = _lastSeenInstances.get(); @@ -497,7 +499,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC if (!curSessions.containsKey(session)) { // remove current-state listener for expired session String instanceName = lastSessions.get(session).getInstanceName(); - manager.removeListener(keyBuilder.currentStates(instanceName, session), this); + SessionId sessionId = lastSessions.get(session).getTypedSessionId(); + manager + .removeListener(keyBuilder.currentStates(instanceName, sessionId.toString()), this); } } } @@ -514,9 +518,10 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC for (String session : curSessions.keySet()) { if (lastSessions == null || !lastSessions.containsKey(session)) { String instanceName = curSessions.get(session).getInstanceName(); + SessionId sessionId = curSessions.get(session).getTypedSessionId(); try { // add current-state listeners for new sessions - manager.addCurrentStateChangeListener(this, instanceName, session); + manager.addCurrentStateChangeListener(this, instanceName, sessionId.toString()); logger.info(manager.getInstanceName() + " added current-state listener for instance: " + instanceName + ", session: " + session + ", listener: " + this); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index a15e6b3..e8e42bf 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -46,7 +46,6 @@ import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.controller.rebalancer.config.RebalancerConfig; -import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.Message; @@ -120,12 +119,14 @@ public class ExternalViewComputeStage extends AbstractBaseStage { ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"); Resource currentResource = cluster.getResourceMap().get(view.getResourceId()); - if (currentResource != null) { + if (clusterStatusMonitor != null && currentResource != null) { IdealState idealState = currentResource.getIdealState(); - if (clusterStatusMonitor != null - && !idealState.getStateModelDefRef().equalsIgnoreCase( - DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { - clusterStatusMonitor.onExternalViewChange(view, idealState); + if (idealState != null) { + StateModelDefId stateModelDefId = idealState.getStateModelDefId(); + if (stateModelDefId != null + && !stateModelDefId.equals(StateModelDefId.SchedulerTaskQueue)) { + clusterStatusMonitor.onExternalViewChange(view, idealState); + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java index 9836020..86b8d41 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java @@ -19,7 +19,6 @@ package org.apache.helix.manager.zk; * under the License. */ -import java.lang.management.ManagementFactory; import java.util.List; import org.apache.helix.ControllerChangeListener; @@ -29,11 +28,7 @@ import org.apache.helix.HelixTimerTask; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.PropertyType; import org.apache.helix.controller.GenericHelixController; -import org.apache.helix.controller.restlet.ZKPropertyTransferServer; -import org.apache.helix.model.LeaderHistory; -import org.apache.helix.model.LiveInstance; import org.apache.log4j.Logger; /** @@ -84,12 +79,12 @@ public class DistributedLeaderElection implements ControllerChangeListener { Builder keyBuilder = accessor.keyBuilder(); while (accessor.getProperty(keyBuilder.controllerLeader()) == null) { - boolean success = tryUpdateController(manager); + boolean success = ZkHelixLeaderElection.tryUpdateController(manager); if (success) { LOG.info(_manager.getInstanceName() + " acquired leadership for cluster: " + _manager.getClusterName()); - updateHistory(manager); + ZkHelixLeaderElection.updateHistory(manager); _manager.getHelixDataAccessor().getBaseDataAccessor().reset(); controllerHelper.addListenersToController(_controller); controllerHelper.startControllerTimerTasks(); @@ -111,59 +106,4 @@ public class DistributedLeaderElection implements ControllerChangeListener { LOG.error("Exception when trying to become leader", e); } } - - private boolean tryUpdateController(HelixManager manager) { - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - Builder keyBuilder = accessor.keyBuilder(); - - LiveInstance leader = new LiveInstance(manager.getInstanceName()); - try { - leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName()); - leader.setSessionId(manager.getSessionId()); - leader.setHelixVersion(manager.getVersion()); - if (ZKPropertyTransferServer.getInstance() != null) { - String zkPropertyTransferServiceUrl = - ZKPropertyTransferServer.getInstance().getWebserviceUrl(); - if (zkPropertyTransferServiceUrl != null) { - leader.setWebserviceUrl(zkPropertyTransferServiceUrl); - } - } else { - LOG.warn("ZKPropertyTransferServer instnace is null"); - } - boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader); - if (success) { - return true; - } else { - LOG.info("Unable to become leader probably because some other controller becames the leader"); - } - } catch (Exception e) { - LOG.error( - "Exception when trying to updating leader record in cluster:" + manager.getClusterName() - + ". Need to check again whether leader node has been created or not", e); - } - - leader = accessor.getProperty(keyBuilder.controllerLeader()); - if (leader != null) { - String leaderSessionId = leader.getTypedSessionId().stringify(); - LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: " - + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId); - - if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) { - return true; - } - } - return false; - } - - private void updateHistory(HelixManager manager) { - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - Builder keyBuilder = accessor.keyBuilder(); - - LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory()); - if (history == null) { - history = new LeaderHistory(PropertyType.HISTORY.toString()); - } - history.updateHistory(manager.getClusterName(), manager.getInstanceName()); - accessor.setProperty(keyBuilder.controllerLeaderHistory(), history); - } } http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java index 77da158..cc99b8e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java @@ -22,8 +22,6 @@ package org.apache.helix.manager.zk; import java.lang.management.ManagementFactory; import org.apache.helix.ControllerChangeListener; -import org.apache.helix.HelixConnection; -import org.apache.helix.HelixController; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; @@ -84,7 +82,6 @@ public class ZkHelixLeaderElection implements ControllerChangeListener { || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) { LOG.info(_controllerId + " is trying to acquire leadership for cluster: " + _clusterId); - while (accessor.getProperty(keyBuilder.controllerLeader()) == null) { boolean success = tryUpdateController(_manager); if (success) { @@ -122,7 +119,12 @@ public class ZkHelixLeaderElection implements ControllerChangeListener { } } - private boolean tryUpdateController(HelixManager manager) { + /** + * Try to become the leader controller + * @param manager a live helix manager connection + * @return true if this controller has been elected the leader, false otherwise + */ + public static boolean tryUpdateController(HelixManager manager) { HelixDataAccessor accessor = manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -155,17 +157,22 @@ public class ZkHelixLeaderElection implements ControllerChangeListener { leader = accessor.getProperty(keyBuilder.controllerLeader()); if (leader != null) { String leaderSessionId = leader.getSessionId(); + String leaderId = leader.getId(); LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: " + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId); - - if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) { + if (leaderId != null && leaderId.equals(manager.getInstanceName()) && leaderSessionId != null + && leaderSessionId.equals(manager.getSessionId())) { return true; } } return false; } - private void updateHistory(HelixManager manager) { + /** + * Update the history with this controller as the most recent leader + * @param manager active helix manager connection + */ + public static void updateHistory(HelixManager manager) { HelixDataAccessor accessor = manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java index 45bee64..ee7efcd 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java @@ -19,8 +19,6 @@ package org.apache.helix.participant; * under the License. */ -import java.lang.management.ManagementFactory; - import org.apache.helix.ControllerChangeListener; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -28,12 +26,9 @@ import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.PropertyType; import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.HelixControllerMain; -import org.apache.helix.controller.restlet.ZKPropertyTransferServer; -import org.apache.helix.model.LeaderHistory; -import org.apache.helix.model.LiveInstance; +import org.apache.helix.manager.zk.ZkHelixLeaderElection; import org.apache.log4j.Logger; // TODO: merge with GenericHelixController @@ -75,9 +70,9 @@ public class DistClusterControllerElection implements ControllerChangeListener { Builder keyBuilder = accessor.keyBuilder(); while (accessor.getProperty(keyBuilder.controllerLeader()) == null) { - boolean success = tryUpdateController(manager); + boolean success = ZkHelixLeaderElection.tryUpdateController(manager); if (success) { - updateHistory(manager); + ZkHelixLeaderElection.updateHistory(manager); if (type == InstanceType.CONTROLLER) { HelixControllerMain.addListenersToController(manager, _controller); manager.startTimerTasks(); @@ -95,9 +90,8 @@ public class DistClusterControllerElection implements ControllerChangeListener { } } - } - else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) { - if(_leader != null) { + } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) { + if (_leader != null) { _leader.disconnect(); } _controller.shutdownClusterStatusMonitor(manager.getClusterName()); @@ -106,62 +100,4 @@ public class DistClusterControllerElection implements ControllerChangeListener { LOG.error("Exception when trying to become leader", e); } } - - private boolean tryUpdateController(HelixManager manager) { - // DataAccessor dataAccessor = manager.getDataAccessor(); - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - Builder keyBuilder = accessor.keyBuilder(); - - LiveInstance leader = new LiveInstance(manager.getInstanceName()); - try { - leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName()); - // TODO: this session id is not the leader's session id in - // distributed mode - leader.setSessionId(manager.getSessionId()); - leader.setHelixVersion(manager.getVersion()); - if (ZKPropertyTransferServer.getInstance() != null) { - String zkPropertyTransferServiceUrl = - ZKPropertyTransferServer.getInstance().getWebserviceUrl(); - if (zkPropertyTransferServiceUrl != null) { - leader.setWebserviceUrl(zkPropertyTransferServiceUrl); - } - } else { - LOG.warn("ZKPropertyTransferServer instnace is null"); - } - boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader); - if (success) { - return true; - } else { - LOG.info("Unable to become leader probably because some other controller becames the leader"); - } - } catch (Exception e) { - LOG.error( - "Exception when trying to updating leader record in cluster:" + manager.getClusterName() - + ". Need to check again whether leader node has been created or not", e); - } - - leader = accessor.getProperty(keyBuilder.controllerLeader()); - if (leader != null) { - String leaderSessionId = leader.getTypedSessionId().stringify(); - LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: " - + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId); - - if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) { - return true; - } - } - return false; - } - - private void updateHistory(HelixManager manager) { - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - Builder keyBuilder = accessor.keyBuilder(); - - LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory()); - if (history == null) { - history = new LeaderHistory(PropertyType.HISTORY.toString()); - } - history.updateHistory(manager.getClusterName(), manager.getInstanceName()); - accessor.setProperty(keyBuilder.controllerLeaderHistory(), history); - } } http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java index 6066859..d78bd9d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java @@ -725,6 +725,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ @Test() public void testSchedulerMsg4() throws Exception { _factory._results.clear(); + Thread.sleep(2000); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { _participants[i].getMessagingService().registerMessageHandlerFactory( http://git-wip-us.apache.org/repos/asf/helix/blob/5cf39b96/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java new file mode 100644 index 0000000..bf89cdb --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSharedConnection.java @@ -0,0 +1,199 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.Map; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConnection; +import org.apache.helix.HelixController; +import org.apache.helix.HelixParticipant; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.api.State; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.ControllerId; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.PartitionId; +import org.apache.helix.api.id.StateModelDefId; +import org.apache.helix.manager.zk.HelixConnectionAdaptor; +import org.apache.helix.manager.zk.ZkHelixConnection; +import org.apache.helix.manager.zk.ZkHelixLeaderElection; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Ensure that the external view is able to update properly when participants share a connection. + */ +public class TestSharedConnection extends ZkUnitTestBase { + /** + * Ensure that the external view is able to update properly when participants share a connection. + */ + @Test + public void testSharedParticipantConnection() throws Exception { + final int NUM_PARTICIPANTS = 2; + final int NUM_PARTITIONS = 4; + final int NUM_REPLICAS = 2; + final String RESOURCE_NAME = "TestDB0"; + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "OnlineOffline", RebalanceMode.CUSTOMIZED, true); // do rebalance + + // Connect + HelixConnection connection = new ZkHelixConnection(ZK_ADDR); + connection.connect(); + + // Start some participants + HelixParticipant[] participants = new HelixParticipant[NUM_PARTICIPANTS]; + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + participants[i] = + connection.createParticipant(ClusterId.from(clusterName), + ParticipantId.from("localhost_" + (12918 + i))); + participants[i].getStateMachineEngine().registerStateModelFactory( + StateModelDefId.from("OnlineOffline"), new TestHelixConnection.MockStateModelFactory()); + participants[i].startAsync(); + } + + // Start the controller + HelixController controller = + connection.createController(ClusterId.from(clusterName), ControllerId.from("controller")); + controller.startAsync(); + Thread.sleep(500); + + // Verify balanced cluster + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // Drop a partition from the first participant + HelixAdmin admin = connection.createClusterManagementTool(); + IdealState idealState = admin.getResourceIdealState(clusterName, RESOURCE_NAME); + Map<ParticipantId, State> participantStateMap = + idealState.getParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_0")); + participantStateMap.remove(ParticipantId.from("localhost_12918")); + idealState.setParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_0"), participantStateMap); + admin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState); + Thread.sleep(1000); + + // Verify balanced cluster + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // Drop a partition from the second participant + participantStateMap = idealState.getParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_1")); + participantStateMap.remove(ParticipantId.from("localhost_12919")); + idealState.setParticipantStateMap(PartitionId.from(RESOURCE_NAME + "_1"), participantStateMap); + admin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState); + Thread.sleep(1000); + + // Verify balanced cluster + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // Clean up + controller.stopAsync(); + for (HelixParticipant participant : participants) { + participant.stopAsync(); + } + admin.dropCluster(clusterName); + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + /** + * Ensure that only one controller with a shared connection thinks it's leader + */ + @Test + public void testSharedControllerConnection() throws Exception { + final int NUM_PARTICIPANTS = 2; + final int NUM_PARTITIONS = 4; + final int NUM_REPLICAS = 2; + final int NUM_CONTROLLERS = 2; + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "OnlineOffline", RebalanceMode.CUSTOMIZED, true); // do rebalance + + // Connect + HelixConnection connection = new ZkHelixConnection(ZK_ADDR); + connection.connect(); + + // Create a couple controllers + HelixController[] controllers = new HelixController[NUM_CONTROLLERS]; + for (int i = 0; i < NUM_CONTROLLERS; i++) { + controllers[i] = + connection.createController(ClusterId.from(clusterName), + ControllerId.from("controller_" + i)); + controllers[i].startAsync(); + } + Thread.sleep(1000); + + // Now verify that exactly one is leader + int leaderCount = 0; + for (HelixController controller : controllers) { + HelixConnectionAdaptor adaptor = new HelixConnectionAdaptor(controller); + boolean result = ZkHelixLeaderElection.tryUpdateController(adaptor); + if (result) { + leaderCount++; + } + } + Assert.assertEquals(leaderCount, 1); + + // Clean up + for (HelixController controller : controllers) { + controller.stopAsync(); + } + HelixAdmin admin = connection.createClusterManagementTool(); + admin.dropCluster(clusterName); + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } +}
