Updated Branches: refs/heads/helix-provisioning 852be0ccb -> 8b1763585
Create an abstract participant service Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8b176358 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8b176358 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8b176358 Branch: refs/heads/helix-provisioning Commit: 8b176358583ca39fd2bba5725e423d9b9baba662 Parents: 852be0c Author: Kanak Biscuitwala <[email protected]> Authored: Wed Jan 15 20:59:20 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Jan 15 20:59:20 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/HelixService.java | 8 +- .../stages/ContainerProvisioningStage.java | 2 + .../manager/zk/AbstractParticipantService.java | 106 +++++++++++++++++++ .../helix/manager/zk/ZkHelixAutoController.java | 4 +- .../helix/manager/zk/ZkHelixController.java | 4 +- .../helix/manager/zk/ZkHelixParticipant.java | 4 +- .../helix/integration/TestHelixConnection.java | 8 +- .../integration/TestLocalContainerProvider.java | 10 +- .../manager/zk/TestZkHelixAutoController.java | 4 +- .../helix/manager/zk/TestZkHelixController.java | 6 +- .../manager/zk/TestZkHelixParticipant.java | 4 +- .../helix/examples/LogicalModelExample.java | 12 +-- .../provisioning/yarn/ContainerParticipant.java | 26 ++--- .../yarn/HelixYarnApplicationMasterMain.java | 2 +- 14 files changed, 148 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/HelixService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixService.java b/helix-core/src/main/java/org/apache/helix/HelixService.java index a1ce0ec..40e9bae 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixService.java +++ b/helix-core/src/main/java/org/apache/helix/HelixService.java @@ -24,12 +24,12 @@ package org.apache.helix; */ public interface HelixService { /** - * start helix service async + * start helix service */ - void startAsync(); + void start(); /** - * stop helix service async + * stop helix service */ - void stopAsync(); + void stop(); } http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index a17251d..fdd6d4e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -96,6 +96,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage { final Cluster cluster = event.getAttribute("ClusterDataCache"); final Collection<Participant> participants = cluster.getParticipantMap().values(); + // TODO: if a process died, we need to mark it as stopped or something + // Participants registered in helix // Give those participants to targetprovider // Provide the response that contains, new containerspecs, containers to be released, http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java new file mode 100644 index 0000000..2e5eafa --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java @@ -0,0 +1,106 @@ +package org.apache.helix.manager.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixConnection; +import org.apache.helix.HelixParticipant; +import org.apache.helix.PreConnectCallback; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.ParticipantId; + +import com.google.common.util.concurrent.AbstractService; + +/** + * A modeling of a helix participant as a self-contained service. + */ +public abstract class AbstractParticipantService extends AbstractService { + private final ClusterId _clusterId; + private final ParticipantId _participantId; + private HelixParticipant _participant; + private HelixConnection _connection; + + /** + * Initialize the service. + * @param connection A live Helix connection + * @param clusterId the cluster to join + * @param participantId a unique identifier that this participant will join with + */ + public AbstractParticipantService(HelixConnection connection, ClusterId clusterId, + ParticipantId participantId) { + _connection = connection; + _clusterId = clusterId; + _participantId = participantId; + } + + @Override + protected void doStart() { + _participant = _connection.createParticipant(_clusterId, _participantId); + + // add a preconnect callback + _participant.addPreConnectCallback(new PreConnectCallback() { + @Override + public void onPreConnect() { + onPreJoinCluster(); + } + }); + + // register state machine and other initialization + init(); + + // start and notify + if (!_connection.isConnected()) { + _connection.connect(); + } + _participant.start(); + notifyStarted(); + } + + @Override + protected void doStop() { + _participant.stop(); + notifyStopped(); + } + + /** + * Initialize the participant. For example, here is where you can register a state machine: <br/> + * <br/> + * <code> + * HelixParticipant participant = getParticipant(); + * participant.getStateMachineEngine().registerStateModelFactory(stateModelDefId, factory); + * </code><br/> + * <br/> + * This code is called prior to starting the participant. + */ + public abstract void init(); + + /** + * Complete any tasks that require a live Helix connection. This function is called before the + * participant declares itself ready to receive state transitions. + */ + public abstract void onPreJoinCluster(); + + /** + * Get an instantiated participant instance. + * @return HelixParticipant + */ + public HelixParticipant getParticipant() { + return _participant; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java index 136d47e..1d4b225 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java @@ -78,13 +78,13 @@ public class ZkHelixAutoController implements HelixAutoController { } @Override - public void startAsync() { + public void start() { _connection.addConnectionStateListener(this); onConnected(); } @Override - public void stopAsync() { + public void stop() { _connection.removeConnectionStateListener(this); onDisconnecting(); } http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java index 3091a90..51bb746 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java @@ -96,13 +96,13 @@ public class ZkHelixController implements HelixController { } @Override - public void startAsync() { + public void start() { _connection.addConnectionStateListener(this); onConnected(); } @Override - public void stopAsync() { + public void stop() { _connection.removeConnectionStateListener(this); onDisconnecting(); } http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java index f8f7a46..c8748d1 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java @@ -440,13 +440,13 @@ public class ZkHelixParticipant implements HelixParticipant { } @Override - public void startAsync() { + public void start() { _connection.addConnectionStateListener(this); onConnected(); } @Override - public void stopAsync() { + public void stop() { _connection.removeConnectionStateListener(this); onDisconnecting(); } http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java index b415393..9b772f2 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java @@ -134,14 +134,14 @@ public class TestHelixConnection extends ZkUnitTestBase { // start controller HelixController controller = connection.createController(clusterId, controllerId); - controller.startAsync(); + controller.start(); // start participant HelixParticipant participant = connection.createParticipant(clusterId, participantId); participant.getStateMachineEngine().registerStateModelFactory( StateModelDefId.from("MasterSlave"), new MockStateModelFactory()); - participant.startAsync(); + participant.start(); // verify final HelixDataAccessor accessor = connection.createDataAccessor(clusterId); @@ -164,8 +164,8 @@ public class TestHelixConnection extends ZkUnitTestBase { Assert.assertTrue(success); // clean up - controller.stopAsync(); - participant.stopAsync(); + controller.stop(); + participant.stop(); connection.disconnect(); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java index 2f1d397..7df03f3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java @@ -115,12 +115,12 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { // start controller ControllerId controllerId = ControllerId.from("controller1"); HelixController controller = connection.createController(clusterId, controllerId); - controller.startAsync(); // TODO: is this really async? + controller.start(); Thread.sleep(10000); // clean up - controller.stopAsync(); // TODO: is this really async? + controller.stop(); connection.disconnect(); Assert.assertEquals(allocated, MAX_PARTICIPANTS); @@ -148,13 +148,13 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { _participant = connection.createParticipant(_clusterId, _participantId); _participant.getStateMachineEngine().registerStateModelFactory( StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory()); - _participant.startAsync(); + _participant.start(); notifyStarted(); } @Override protected void doStop() { - _participant.stopAsync(); + _participant.stop(); notifyStopped(); } @@ -242,7 +242,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { } @Override - public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant ) { + public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant) { ParticipantService participantService = new ParticipantService(_clusterId, _containerParticipants.get(containerId)); participantService.startAsync(); http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java index 28b1477..856707b 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java @@ -65,7 +65,7 @@ public class TestZkHelixAutoController extends ZkUnitTestBase { int port = 12918 + i; ControllerId controllerId = ControllerId.from("localhost_" + port); controllers[i] = connection.createAutoController(clusterId, controllerId); - controllers[i].startAsync(); + controllers[i].start(); } // check live-instance znode for localhost_12918/12919 exists @@ -84,7 +84,7 @@ public class TestZkHelixAutoController extends ZkUnitTestBase { Assert.assertEquals(leader.getInstanceName(), controllers[0].getControllerId().stringify()); // stop controller localhost_12918 - controllers[0].stopAsync(); + controllers[0].stop(); // check live-instance znode for localhost_12918 is gone String instanceName = controllers[0].getControllerId().stringify(); http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java index 0127edb..beac6aa 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java @@ -63,7 +63,7 @@ public class TestZkHelixController extends ZkUnitTestBase { ClusterId clusterId = ClusterId.from(clusterName); ControllerId controllerId = ControllerId.from("controller"); HelixController controller = connection.createController(clusterId, controllerId); - controller.startAsync(); + controller.start(); // check leader znode exists HelixDataAccessor accessor = connection.createDataAccessor(clusterId); @@ -73,7 +73,7 @@ public class TestZkHelixController extends ZkUnitTestBase { Assert.assertEquals(leader.getInstanceName(), controllerId.stringify()); // stop participant - controller.stopAsync(); + controller.stop(); // check leader znode is gone Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader())); @@ -120,7 +120,7 @@ public class TestZkHelixController extends ZkUnitTestBase { // start controller HelixController controller = connection.createController(clusterId, controllerId); - controller.startAsync(); + controller.start(); // check live-instance znode for localhost_12918 exists final HelixDataAccessor accessor = http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java index 466d0b3..e566ef2 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java @@ -71,7 +71,7 @@ public class TestZkHelixParticipant extends ZkUnitTestBase { participants[i].getStateMachineEngine().registerStateModelFactory( StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory()); - participants[i].startAsync(); + participants[i].start(); } // check live-instance znode for localhost_12918/12919 exist @@ -85,7 +85,7 @@ public class TestZkHelixParticipant extends ZkUnitTestBase { } // stop participant localhost_12918 - participants[0].stopAsync(); + participants[0].stop(); // check live-instance znode for localhost_12918 is gone Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(participants[0] http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java ---------------------------------------------------------------------- diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java index 880d31c..515fdab 100644 --- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java +++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java @@ -118,31 +118,31 @@ public class LogicalModelExample { // start the controller ControllerId controllerId = ControllerId.from("exampleController"); HelixController helixController = connection.createController(clusterId, controllerId); - helixController.startAsync(); + helixController.start(); // start the specified participant HelixParticipant helixParticipant = connection.createParticipant(clusterId, participant.getId()); helixParticipant.getStateMachineEngine().registerStateModelFactory( lockUnlock.getStateModelDefId(), new LockUnlockFactory()); - helixParticipant.startAsync(); + helixParticipant.start(); // start another participant via auto join HelixParticipant autoJoinParticipant = connection.createParticipant(clusterId, ParticipantId.from("localhost_12120")); autoJoinParticipant.getStateMachineEngine().registerStateModelFactory( lockUnlock.getStateModelDefId(), new LockUnlockFactory()); - autoJoinParticipant.startAsync(); + autoJoinParticipant.start(); Thread.sleep(5000); printExternalView(connection, clusterId, resource.getId()); // stop the participants - helixParticipant.stopAsync(); - autoJoinParticipant.stopAsync(); + helixParticipant.stop(); + autoJoinParticipant.stop(); // stop the controller - helixController.stopAsync(); + helixController.stop(); // drop the cluster dropCluster(clusterId, connection); http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java index 9c80d87..81a9c56 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java @@ -3,37 +3,25 @@ package org.apache.helix.provisioning.yarn; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.helix.HelixConnection; -import org.apache.helix.HelixParticipant; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.manager.zk.AbstractParticipantService; -import com.google.common.util.concurrent.AbstractService; - -public class ContainerParticipant extends AbstractService { +public class ContainerParticipant extends AbstractParticipantService { private static final Log LOG = LogFactory.getLog(ContainerParticipant.class); - private final ClusterId _clusterId; - private final ParticipantId _participantId; - private HelixParticipant _participant; - private HelixConnection _connection; public ContainerParticipant(HelixConnection connection, ClusterId clusterId, ParticipantId participantId) { - _connection = connection; - _clusterId = clusterId; - _participantId = participantId; + super(connection, clusterId, participantId); } @Override - protected void doStart() { - _participant = _connection.createParticipant(_clusterId, _participantId); - // register statemachine - _participant.startAsync(); - notifyStarted(); + public void init() { + // register a state model } @Override - protected void doStop() { - _participant.stopAsync(); - notifyStopped(); + public void onPreJoinCluster() { + // do tasks that require a connection } } http://git-wip-us.apache.org/repos/asf/helix/blob/8b176358/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java index 2356e91..c84b627 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java @@ -115,7 +115,7 @@ public class HelixYarnApplicationMasterMain { // start controller ControllerId controllerId = ControllerId.from("controller1"); HelixController controller = connection.createController(clusterId, controllerId); - controller.startAsync(); + controller.start(); Thread shutdownhook = new Thread(new Runnable() { @Override
