STORM-1266: port backtype.storm.command.rebalance to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8aaa8388 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8aaa8388 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8aaa8388 Branch: refs/heads/master Commit: 8aaa83880de7a5bc9595a6e3aabc242c3bec5470 Parents: 3425e7d Author: Abhishek Agarwal <[email protected]> Authored: Mon Feb 22 16:38:43 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Mon Feb 22 16:38:43 2016 +0530 ---------------------------------------------------------------------- .../clj/org/apache/storm/command/rebalance.clj | 47 ----------- .../jvm/org/apache/storm/command/Rebalance.java | 86 ++++++++++++++++++++ .../org/apache/storm/command/RebalanceTest.java | 41 ++++++++++ 3 files changed, 127 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/src/clj/org/apache/storm/command/rebalance.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/rebalance.clj b/storm-core/src/clj/org/apache/storm/command/rebalance.clj deleted file mode 100644 index 8428d14..0000000 --- a/storm-core/src/clj/org/apache/storm/command/rebalance.clj +++ /dev/null @@ -1,47 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.command.rebalance - (:use [clojure.tools.cli :only [cli]]) - (:use [org.apache.storm config log]) - (:use [org.apache.storm.internal thrift]) - (:import [org.apache.storm.generated RebalanceOptions]) - (:gen-class)) - -(defn- parse-executor [^String s] - (let [eq-pos (.lastIndexOf s "=") - name (.substring s 0 eq-pos) - amt (.substring s (inc eq-pos))] - {name (Integer/parseInt amt)} - )) - -(defn -main [& args] - (let [[{wait :wait executor :executor num-workers :num-workers} [name] _] - (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)] - ["-n" "--num-workers" :default nil :parse-fn #(Integer/parseInt %)] - ["-e" "--executor" :parse-fn parse-executor - :assoc-fn (fn [previous key val] - (assoc previous key - (if-let [oldval (get previous key)] - (merge oldval val) - val)))]) - opts (RebalanceOptions.)] - (if wait (.set_wait_secs opts wait)) - (if executor (.set_num_executors opts executor)) - (if num-workers (.set_num_workers opts num-workers)) - (with-configured-nimbus-connection nimbus - (.rebalance nimbus name opts) - (log-message "Topology " name " is rebalancing") - ))) http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/src/jvm/org/apache/storm/command/Rebalance.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java new file mode 100644 index 0000000..ed65950 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.command; + +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.RebalanceOptions; +import org.apache.storm.utils.NimbusClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static java.lang.String.format; + +public class Rebalance { + + private static final Logger LOG = LoggerFactory.getLogger(Rebalance.class); + + public static void main(String[] args) throws Exception { + Map<String, Object> cl = CLI.opt("w", "wait", null, CLI.AS_INT) + .opt("n", "num-workers", null, CLI.AS_INT) + .opt("e", "executor", null, new ExecutorParser(), CLI.INTO_MAP) + .arg("topologyName", CLI.FIRST_WINS) + .parse(args); + final String name = (String) cl.get("topologyName"); + final RebalanceOptions rebalanceOptions = new RebalanceOptions(); + Integer wait = (Integer) cl.get("w"); + Integer numWorkers = (Integer) cl.get("n"); + Map<String, Integer> numExecutors = (Map<String, Integer>) cl.get("e"); + + if (null != wait) { + rebalanceOptions.set_wait_secs(wait); + } + if (null != numWorkers) { + rebalanceOptions.set_num_workers(numWorkers); + } + if (null != numExecutors) { + rebalanceOptions.set_num_executors(numExecutors); + } + + NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() { + @Override + public void run(Nimbus.Client nimbus) throws Exception { + nimbus.rebalance(name, rebalanceOptions); + LOG.info("Topology {} is rebalancing", name); + } + }); + } + + + static final class ExecutorParser implements CLI.Parse { + + @Override + public Object parse(String value) { + try { + int splitIndex = value.lastIndexOf('='); + String componentName = value.substring(0, splitIndex); + Integer parallelism = Integer.parseInt(value.substring(splitIndex + 1)); + Map<String, Integer> result = new HashMap<String, Integer>(); + result.put(componentName, parallelism); + return result; + } catch (Throwable ex) { + throw new IllegalArgumentException( + format("Failed to parse '%s' correctly. Expected in <component>=<parallelism> format", value), ex); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java b/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java new file mode 100644 index 0000000..cec4958 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.storm.command; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class RebalanceTest { + + @Test + public void testParser() throws Exception { + Rebalance.ExecutorParser executorParser = new Rebalance.ExecutorParser(); + Map<String, Integer> componentParallelism = (Map<String, Integer>) executorParser.parse("comp1=3"); + Assert.assertEquals(3, (int) componentParallelism.get("comp1")); + } + + @Test(expected = IllegalArgumentException.class) + public void testExepction() throws Exception { + Rebalance.ExecutorParser executorParser = new Rebalance.ExecutorParser(); + executorParser.parse("comp1 3"); + } +}
