Start participants for multiple in one JVM
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4b50445d Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4b50445d Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4b50445d Branch: refs/heads/master Commit: 4b50445d89182d5ca48d4557c649a64b136e9f79 Parents: 4a76f03 Author: Junkai Xue <j...@linkedin.com> Authored: Mon Apr 9 12:28:35 2018 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Wed Apr 11 13:52:26 2018 -0700 ---------------------------------------------------------------------- helix-core/pom.xml | 4 + ...rResourceOnlineOfflineStateModelFactory.java | 77 +++++++ .../SegmentOnlineOfflineStateModelFactory.java | 53 +++++ .../tools/commandtools/ExampleParticipant.java | 207 +++++++++++++++++++ 4 files changed, 341 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/4b50445d/helix-core/pom.xml ---------------------------------------------------------------------- diff --git a/helix-core/pom.xml b/helix-core/pom.xml index b6a2539..a29243e 100644 --- a/helix-core/pom.xml +++ b/helix-core/pom.xml @@ -220,6 +220,10 @@ under the License. <name>start-helix-participant</name> </program> <program> + <mainClass>org.apache.helix.tools.commandtools.ExampleParticipant</mainClass> + <name>start-participants</name> + </program> + <program> <mainClass>org.apache.helix.tools.LocalZKServer</mainClass> <name>start-standalone-zookeeper</name> </program> http://git-wip-us.apache.org/repos/asf/helix/blob/4b50445d/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java new file mode 100644 index 0000000..0b1b81f --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java @@ -0,0 +1,77 @@ +package org.apache.helix.examples; + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceOnlineOfflineStateModelFactory.class); + + public BrokerResourceOnlineOfflineStateModelFactory() { + + } + + public static String getStateModelDef() { + return "BrokerResourceOnlineOfflineStateModel"; + } + + public StateModel createNewStateModel(String resourceName) { + return new BrokerResourceOnlineOfflineStateModelFactory.BrokerResourceOnlineOfflineStateModel(); + } + + @StateModelInfo( + states = {"{'OFFLINE','ONLINE', 'DROPPED'}"}, + initialState = "OFFLINE" + ) + public class BrokerResourceOnlineOfflineStateModel extends StateModel { + public BrokerResourceOnlineOfflineStateModel() { + } + + @Transition( + from = "OFFLINE", + to = "ONLINE" + ) + public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { + LOGGER.info("Become Online from Offline"); + } + + @Transition( + from = "ONLINE", + to = "OFFLINE" + ) + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + LOGGER.info("Become Offline from Online"); + + } + + @Transition( + from = "OFFLINE", + to = "DROPPED" + ) + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + LOGGER.info("Become Dropped from Offline"); + } + + @Transition( + from = "ONLINE", + to = "DROPPED" + ) + public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { + LOGGER.info("Become Dropped from Online"); + } + + @Transition( + from = "ERROR", + to = "OFFLINE" + ) + public void onBecomeOfflineFromError(Message message, NotificationContext context) { + LOGGER.info("Become Offline from Error"); + } + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/4b50445d/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java new file mode 100644 index 0000000..30a23d8 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java @@ -0,0 +1,53 @@ +package org.apache.helix.examples; + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + public SegmentOnlineOfflineStateModelFactory() { + } + + @Override + public StateModel createNewStateModel(String partitionName) { + final SegmentOnlineOfflineStateModel SegmentOnlineOfflineStateModel = new SegmentOnlineOfflineStateModel(); + return SegmentOnlineOfflineStateModel; + } + + @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE") + public static class SegmentOnlineOfflineStateModel extends StateModel { + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOnlineOfflineStateModel.class); + + @Transition(from = "OFFLINE", to = "ONLINE") + public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { + LOGGER.info("Become Online from Offline"); + } + + // Remove segment from InstanceDataManager. + // Still keep the data files in local. + @Transition(from = "ONLINE", to = "OFFLINE") + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + LOGGER.info("Become Offline from Online"); + + } + + // Delete segment from local directory. + @Transition(from = "OFFLINE", to = "DROPPED") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + LOGGER.info("Become Dropped from Offline"); + + } + + @Transition(from = "ONLINE", to = "DROPPED") + public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { + LOGGER.info("Become Dropped from Online"); + + } + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/4b50445d/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java new file mode 100644 index 0000000..3085075 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java @@ -0,0 +1,207 @@ +package org.apache.helix.tools.commandtools; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Arrays; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.examples.BrokerResourceOnlineOfflineStateModelFactory; +import org.apache.helix.examples.LeaderStandbyStateModelFactory; +import org.apache.helix.examples.MasterSlaveStateModelFactory; +import org.apache.helix.examples.OnlineOfflineStateModelFactory; +import org.apache.helix.examples.SegmentOnlineOfflineStateModelFactory; +import org.apache.helix.manager.zk.HelixManagerShutdownHook; +import org.apache.helix.model.Message.MessageType; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExampleParticipant { + private static final Logger LOG = LoggerFactory.getLogger(ExampleParticipant.class); + + public static final String zkServer = "zkSvr"; + public static final String cluster = "cluster"; + public static final String instances = "instances"; + public static final String help = "help"; + public static final String transDelay = "transDelay"; + + private final String zkConnectString; + private final String clusterName; + private final String instanceName; + private HelixManager manager; + + private StateModelFactory<StateModel> stateModelFactory; + private final int delay; + + public ExampleParticipant(String zkConnectString, String clusterName, String instanceName, + int delay) { + this.zkConnectString = zkConnectString; + this.clusterName = clusterName; + this.instanceName = instanceName; + this.delay = delay; + } + + public void start() throws Exception { + manager = + HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, + zkConnectString); + + // genericStateMachineHandler = new StateMachineEngine(); + // genericStateMachineHandler.registerStateModelFactory(stateModelType, + // stateModelFactory); + + StateMachineEngine stateMach = manager.getStateMachineEngine(); + stateMach.registerStateModelFactory("MasterSlave", + new MasterSlaveStateModelFactory(this.instanceName, delay)); + stateMach.registerStateModelFactory("OnlineOffline", + new OnlineOfflineStateModelFactory(this.instanceName, delay)); + stateMach.registerStateModelFactory("LeaderStandby", + new LeaderStandbyStateModelFactory(this.instanceName, delay)); + stateMach.registerStateModelFactory("BrokerResourceOnlineOfflineStateModel", + new BrokerResourceOnlineOfflineStateModelFactory()); + stateMach.registerStateModelFactory("SegmentOnlineOfflineStateModel", + new SegmentOnlineOfflineStateModelFactory()); + + manager.connect(); + manager.getMessagingService() + .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), stateMach); + } + + public void stop() { + manager.disconnect(); + } + + public HelixManager getManager() { + return manager; + } + + @SuppressWarnings("static-access") + private static Options constructCommandLineOptions() { + Option helpOption = + OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info") + .create(); + + Option zkServerOption = + OptionBuilder.withLongOpt(zkServer).withDescription("Provide zookeeper address").create(); + zkServerOption.setArgs(1); + zkServerOption.setRequired(true); + zkServerOption.setArgName("ZookeeperServerAddress(Required)"); + + Option clusterOption = + OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster name").create(); + clusterOption.setArgs(1); + clusterOption.setRequired(true); + clusterOption.setArgName("Cluster name (Required)"); + + Option instancesOption = + OptionBuilder.withLongOpt(instances).withDescription("Provide instance names, separated by ':").create(); + instancesOption.setArgs(1); + instancesOption.setRequired(true); + instancesOption.setArgName("Instance names (Required)"); + + Option transDelayOption = + OptionBuilder.withLongOpt(transDelay).withDescription("Provide state trans delay").create(); + transDelayOption.setArgs(1); + transDelayOption.setRequired(false); + transDelayOption.setArgName("Delay time in state transition, in MS"); + + OptionGroup optionGroup = new OptionGroup(); + optionGroup.addOption(zkServerOption); + + Options options = new Options(); + options.addOption(helpOption); + options.addOption(clusterOption); + options.addOption(instancesOption); + options.addOption(transDelayOption); + + options.addOptionGroup(optionGroup); + + return options; + } + + public static void printUsage(Options cliOptions) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setWidth(1000); + helpFormatter.printHelp("java " + ExampleParticipant.class.getName(), cliOptions); + } + + public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception { + CommandLineParser cliParser = new GnuParser(); + Options cliOptions = constructCommandLineOptions(); + try { + return cliParser.parse(cliOptions, cliArgs); + } catch (ParseException pe) { + System.err.println("CommandLineClient: failed to parse command-line options: " + + pe.toString()); + printUsage(cliOptions); + System.exit(1); + } + return null; + } + + public static void main(String[] args) throws Exception { + int delay = 0; + + CommandLine cmd = processCommandLineArgs(args); + String zkConnectString = cmd.getOptionValue(zkServer); + String clusterName = cmd.getOptionValue(cluster); + String instanceNames = cmd.getOptionValue(instances); + List<String> hosts = Arrays.asList(instanceNames.split(":")); + + if (cmd.hasOption(transDelay)) { + try { + delay = Integer.parseInt(cmd.getOptionValue(transDelay)); + if (delay < 0) { + throw new Exception("delay must be positive"); + } + } catch (Exception e) { + e.printStackTrace(); + delay = 0; + } + } + + System.out.println("Starting Instances with ZK:" + zkConnectString + ", cluster: " + clusterName + + ", instances: " + hosts); + + for (String instanceName : hosts) { + System.out.println("Starting Instance:" + instanceName); + ExampleParticipant process = + new ExampleParticipant(zkConnectString, clusterName, instanceName, delay); + process.start(); + System.out.println("Started Instance:" + instanceName); + Runtime.getRuntime().addShutdownHook(new HelixManagerShutdownHook(process.getManager())); + } + + Thread.currentThread().join(); + } +}