STORM-1266: port backtype.storm.command.rebalance to java

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8aaa8388
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8aaa8388
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8aaa8388

Branch: refs/heads/master
Commit: 8aaa83880de7a5bc9595a6e3aabc242c3bec5470
Parents: 3425e7d
Author: Abhishek Agarwal <[email protected]>
Authored: Mon Feb 22 16:38:43 2016 +0530
Committer: Abhishek Agarwal <[email protected]>
Committed: Mon Feb 22 16:38:43 2016 +0530

----------------------------------------------------------------------
 .../clj/org/apache/storm/command/rebalance.clj  | 47 -----------
 .../jvm/org/apache/storm/command/Rebalance.java | 86 ++++++++++++++++++++
 .../org/apache/storm/command/RebalanceTest.java | 41 ++++++++++
 3 files changed, 127 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/src/clj/org/apache/storm/command/rebalance.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/rebalance.clj 
b/storm-core/src/clj/org/apache/storm/command/rebalance.clj
deleted file mode 100644
index 8428d14..0000000
--- a/storm-core/src/clj/org/apache/storm/command/rebalance.clj
+++ /dev/null
@@ -1,47 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.rebalance
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm config log])
-  (:use [org.apache.storm.internal thrift])
-  (:import [org.apache.storm.generated RebalanceOptions])
-  (:gen-class))
-
-(defn- parse-executor [^String s]
-  (let [eq-pos (.lastIndexOf s "=")
-        name (.substring s 0 eq-pos)
-        amt (.substring s (inc eq-pos))]
-    {name (Integer/parseInt amt)}
-    ))
-
-(defn -main [& args] 
-  (let [[{wait :wait executor :executor num-workers :num-workers} [name] _]
-                  (cli args ["-w" "--wait" :default nil :parse-fn 
#(Integer/parseInt %)]
-                            ["-n" "--num-workers" :default nil :parse-fn 
#(Integer/parseInt %)]
-                            ["-e" "--executor"  :parse-fn parse-executor
-                             :assoc-fn (fn [previous key val]
-                                         (assoc previous key
-                                                (if-let [oldval (get previous 
key)]
-                                                  (merge oldval val)
-                                                  val)))])
-        opts (RebalanceOptions.)]
-    (if wait (.set_wait_secs opts wait))
-    (if executor (.set_num_executors opts executor))
-    (if num-workers (.set_num_workers opts num-workers))
-    (with-configured-nimbus-connection nimbus
-      (.rebalance nimbus name opts)
-      (log-message "Topology " name " is rebalancing")
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java 
b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
new file mode 100644
index 0000000..ed65950
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.RebalanceOptions;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.lang.String.format;
+
+public class Rebalance {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Rebalance.class);
+
+    public static void main(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("w", "wait", null, CLI.AS_INT)
+            .opt("n", "num-workers", null, CLI.AS_INT)
+            .opt("e", "executor", null, new ExecutorParser(), CLI.INTO_MAP)
+            .arg("topologyName", CLI.FIRST_WINS)
+            .parse(args);
+        final String name = (String) cl.get("topologyName");
+        final RebalanceOptions rebalanceOptions = new RebalanceOptions();
+        Integer wait = (Integer) cl.get("w");
+        Integer numWorkers = (Integer) cl.get("n");
+        Map<String, Integer> numExecutors = (Map<String, Integer>) cl.get("e");
+
+        if (null != wait) {
+            rebalanceOptions.set_wait_secs(wait);
+        }
+        if (null != numWorkers) {
+            rebalanceOptions.set_num_workers(numWorkers);
+        }
+        if (null != numExecutors) {
+            rebalanceOptions.set_num_executors(numExecutors);
+        }
+
+        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+            @Override
+            public void run(Nimbus.Client nimbus) throws Exception {
+                nimbus.rebalance(name, rebalanceOptions);
+                LOG.info("Topology {} is rebalancing", name);
+            }
+        });
+    }
+
+
+    static final class ExecutorParser implements CLI.Parse {
+
+        @Override
+        public Object parse(String value) {
+            try {
+                int splitIndex = value.lastIndexOf('=');
+                String componentName = value.substring(0, splitIndex);
+                Integer parallelism = 
Integer.parseInt(value.substring(splitIndex + 1));
+                Map<String, Integer> result = new HashMap<String, Integer>();
+                result.put(componentName, parallelism);
+                return result;
+            } catch (Throwable ex) {
+                throw new IllegalArgumentException(
+                    format("Failed to parse '%s' correctly. Expected in 
<component>=<parallelism> format", value), ex);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java 
b/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
new file mode 100644
index 0000000..cec4958
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.storm.command;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class RebalanceTest {
+
+    @Test
+    public void testParser() throws Exception {
+        Rebalance.ExecutorParser executorParser = new 
Rebalance.ExecutorParser();
+        Map<String, Integer> componentParallelism = (Map<String, Integer>) 
executorParser.parse("comp1=3");
+        Assert.assertEquals(3, (int) componentParallelism.get("comp1"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testExepction() throws Exception {
+        Rebalance.ExecutorParser executorParser = new 
Rebalance.ExecutorParser();
+        executorParser.parse("comp1 3");
+    }
+}

Reply via email to