Start participants for multiple in one JVM

Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4b50445d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4b50445d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4b50445d

Branch: refs/heads/master
Commit: 4b50445d89182d5ca48d4557c649a64b136e9f79
Parents: 4a76f03
Author: Junkai Xue <j...@linkedin.com>
Authored: Mon Apr 9 12:28:35 2018 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Wed Apr 11 13:52:26 2018 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   4 +
 ...rResourceOnlineOfflineStateModelFactory.java |  77 +++++++
 .../SegmentOnlineOfflineStateModelFactory.java  |  53 +++++
 .../tools/commandtools/ExampleParticipant.java  | 207 +++++++++++++++++++
 4 files changed, 341 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/4b50445d/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index b6a2539..a29243e 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -220,6 +220,10 @@ under the License.
               <name>start-helix-participant</name>
             </program>
             <program>
+              
<mainClass>org.apache.helix.tools.commandtools.ExampleParticipant</mainClass>
+              <name>start-participants</name>
+            </program>
+            <program>
               <mainClass>org.apache.helix.tools.LocalZKServer</mainClass>
               <name>start-standalone-zookeeper</name>
             </program>

http://git-wip-us.apache.org/repos/asf/helix/blob/4b50445d/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java
 
b/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000..0b1b81f
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -0,0 +1,77 @@
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerResourceOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerResourceOnlineOfflineStateModelFactory.class);
+
+  public BrokerResourceOnlineOfflineStateModelFactory() {
+
+  }
+
+  public static String getStateModelDef() {
+    return "BrokerResourceOnlineOfflineStateModel";
+  }
+
+  public StateModel createNewStateModel(String resourceName) {
+    return new 
BrokerResourceOnlineOfflineStateModelFactory.BrokerResourceOnlineOfflineStateModel();
+  }
+
+  @StateModelInfo(
+      states = {"{'OFFLINE','ONLINE', 'DROPPED'}"},
+      initialState = "OFFLINE"
+  )
+  public class BrokerResourceOnlineOfflineStateModel extends StateModel {
+    public BrokerResourceOnlineOfflineStateModel() {
+    }
+
+    @Transition(
+        from = "OFFLINE",
+        to = "ONLINE"
+    )
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
+        LOGGER.info("Become Online from Offline");
+    }
+
+    @Transition(
+        from = "ONLINE",
+        to = "OFFLINE"
+    )
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
+      LOGGER.info("Become Offline from Online");
+
+    }
+
+    @Transition(
+        from = "OFFLINE",
+        to = "DROPPED"
+    )
+    public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
+      LOGGER.info("Become Dropped from Offline");
+    }
+
+    @Transition(
+        from = "ONLINE",
+        to = "DROPPED"
+    )
+    public void onBecomeDroppedFromOnline(Message message, NotificationContext 
context) {
+      LOGGER.info("Become Dropped from Online");
+    }
+
+    @Transition(
+        from = "ERROR",
+        to = "OFFLINE"
+    )
+    public void onBecomeOfflineFromError(Message message, NotificationContext 
context) {
+      LOGGER.info("Become Offline from Error");
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/4b50445d/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java
 
b/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000..30a23d8
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,53 @@
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
+  public SegmentOnlineOfflineStateModelFactory() {
+  }
+
+  @Override
+  public StateModel createNewStateModel(String partitionName) {
+    final SegmentOnlineOfflineStateModel SegmentOnlineOfflineStateModel = new 
SegmentOnlineOfflineStateModel();
+    return SegmentOnlineOfflineStateModel;
+  }
+
+  @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = 
"OFFLINE")
+  public static class SegmentOnlineOfflineStateModel extends StateModel {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentOnlineOfflineStateModel.class);
+
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
+      LOGGER.info("Become Online from Offline");
+    }
+
+    // Remove segment from InstanceDataManager.
+    // Still keep the data files in local.
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
+      LOGGER.info("Become Offline from Online");
+
+    }
+
+    // Delete segment from local directory.
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
+      LOGGER.info("Become Dropped from Offline");
+
+    }
+
+    @Transition(from = "ONLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOnline(Message message, NotificationContext 
context) {
+      LOGGER.info("Become Dropped from Online");
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/4b50445d/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java
 
b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java
new file mode 100644
index 0000000..3085075
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java
@@ -0,0 +1,207 @@
+package org.apache.helix.tools.commandtools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.BrokerResourceOnlineOfflineStateModelFactory;
+import org.apache.helix.examples.LeaderStandbyStateModelFactory;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.examples.SegmentOnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.HelixManagerShutdownHook;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExampleParticipant {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExampleParticipant.class);
+
+  public static final String zkServer = "zkSvr";
+  public static final String cluster = "cluster";
+  public static final String instances = "instances";
+  public static final String help = "help";
+  public static final String transDelay = "transDelay";
+
+  private final String zkConnectString;
+  private final String clusterName;
+  private final String instanceName;
+  private HelixManager manager;
+
+  private StateModelFactory<StateModel> stateModelFactory;
+  private final int delay;
+
+  public ExampleParticipant(String zkConnectString, String clusterName, String 
instanceName,
+      int delay) {
+    this.zkConnectString = zkConnectString;
+    this.clusterName = clusterName;
+    this.instanceName = instanceName;
+    this.delay = delay;
+  }
+
+  public void start() throws Exception {
+    manager =
+        HelixManagerFactory.getZKHelixManager(clusterName, instanceName, 
InstanceType.PARTICIPANT,
+            zkConnectString);
+
+    // genericStateMachineHandler = new StateMachineEngine();
+    // genericStateMachineHandler.registerStateModelFactory(stateModelType,
+    // stateModelFactory);
+
+    StateMachineEngine stateMach = manager.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave",
+        new MasterSlaveStateModelFactory(this.instanceName, delay));
+    stateMach.registerStateModelFactory("OnlineOffline",
+        new OnlineOfflineStateModelFactory(this.instanceName, delay));
+    stateMach.registerStateModelFactory("LeaderStandby",
+        new LeaderStandbyStateModelFactory(this.instanceName, delay));
+    
stateMach.registerStateModelFactory("BrokerResourceOnlineOfflineStateModel",
+        new BrokerResourceOnlineOfflineStateModelFactory());
+    stateMach.registerStateModelFactory("SegmentOnlineOfflineStateModel",
+        new SegmentOnlineOfflineStateModelFactory());
+
+    manager.connect();
+    manager.getMessagingService()
+        .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), 
stateMach);
+  }
+
+  public void stop() {
+    manager.disconnect();
+  }
+
+  public HelixManager getManager() {
+    return manager;
+  }
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions() {
+    Option helpOption =
+        OptionBuilder.withLongOpt(help).withDescription("Prints command-line 
options info")
+            .create();
+
+    Option zkServerOption =
+        OptionBuilder.withLongOpt(zkServer).withDescription("Provide zookeeper 
address").create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+    Option clusterOption =
+        OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster 
name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option instancesOption =
+        OptionBuilder.withLongOpt(instances).withDescription("Provide instance 
names, separated by ':").create();
+    instancesOption.setArgs(1);
+    instancesOption.setRequired(true);
+    instancesOption.setArgName("Instance names (Required)");
+
+    Option transDelayOption =
+        OptionBuilder.withLongOpt(transDelay).withDescription("Provide state 
trans delay").create();
+    transDelayOption.setArgs(1);
+    transDelayOption.setRequired(false);
+    transDelayOption.setArgName("Delay time in state transition, in MS");
+
+    OptionGroup optionGroup = new OptionGroup();
+    optionGroup.addOption(zkServerOption);
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(clusterOption);
+    options.addOption(instancesOption);
+    options.addOption(transDelayOption);
+
+    options.addOptionGroup(optionGroup);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + ExampleParticipant.class.getName(), 
cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs) throws 
Exception {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    try {
+      return cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe) {
+      System.err.println("CommandLineClient: failed to parse command-line 
options: "
+          + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int delay = 0;
+
+    CommandLine cmd = processCommandLineArgs(args);
+    String zkConnectString = cmd.getOptionValue(zkServer);
+    String clusterName = cmd.getOptionValue(cluster);
+    String instanceNames = cmd.getOptionValue(instances);
+    List<String> hosts = Arrays.asList(instanceNames.split(":"));
+
+    if (cmd.hasOption(transDelay)) {
+      try {
+        delay = Integer.parseInt(cmd.getOptionValue(transDelay));
+        if (delay < 0) {
+          throw new Exception("delay must be positive");
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+        delay = 0;
+      }
+    }
+
+    System.out.println("Starting Instances with ZK:" + zkConnectString + ", 
cluster: " + clusterName
+        + ", instances: " + hosts);
+
+    for (String instanceName : hosts) {
+      System.out.println("Starting Instance:" + instanceName);
+      ExampleParticipant process =
+          new ExampleParticipant(zkConnectString, clusterName, instanceName, 
delay);
+      process.start();
+      System.out.println("Started Instance:" + instanceName);
+      Runtime.getRuntime().addShutdownHook(new 
HelixManagerShutdownHook(process.getManager()));
+    }
+
+    Thread.currentThread().join();
+  }
+}

Reply via email to