[STORM-1230] port backtype.storm.process-simulator to java.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/314d58db Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/314d58db Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/314d58db Branch: refs/heads/master Commit: 314d58db60bb4490e71055acd82978e20681c89e Parents: 35037d6 Author: zhuol <[email protected]> Authored: Thu Feb 11 17:30:55 2016 -0600 Committer: zhuol <[email protected]> Committed: Fri Feb 19 13:04:35 2016 -0600 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/supervisor.clj | 8 +- .../clj/org/apache/storm/process_simulator.clj | 49 ----------- storm-core/src/clj/org/apache/storm/testing.clj | 11 +-- .../jvm/org/apache/storm/ProcessSimulator.java | 89 ++++++++++++++++++++ 4 files changed, 99 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 21e5854..a34d461 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -28,14 +28,14 @@ (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo]) (:import [java.nio.file Files StandardCopyOption]) - (:import [org.apache.storm Config]) (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment]) + (:import [org.apache.storm Config ProcessSimulator]) (:import [org.apache.storm.localizer LocalResource]) (:import [org.apache.storm.event EventManagerImp]) (:use [org.apache.storm.daemon common]) (:import [org.apache.storm.command HealthCheck]) (:require [org.apache.storm.daemon [worker :as worker]] - [org.apache.storm [process-simulator :as psim] [cluster :as cluster]] + [org.apache.storm [cluster :as cluster]] [clojure.set :as set]) (:import [org.apache.thrift.transport TTransportException]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) @@ -311,7 +311,7 @@ as-user (conf SUPERVISOR-RUN-WORKER-AS-USER) user (ConfigUtils/getWorkerUser conf id)] (when thread-pid - (psim/kill-process thread-pid)) + (ProcessSimulator/killProcess thread-pid)) (doseq [pid pids] (if as-user (worker-launcher-and-wait conf user ["signal" pid "15"] :log-prefix (str "kill -15 " pid)) @@ -1309,7 +1309,7 @@ port worker-id)] (ConfigUtils/setWorkerUserWSE conf worker-id "") - (psim/register-process pid worker) + (ProcessSimulator/registerProcess pid worker) (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid) )) http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/process_simulator.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/process_simulator.clj b/storm-core/src/clj/org/apache/storm/process_simulator.clj deleted file mode 100644 index fe5bc5b..0000000 --- a/storm-core/src/clj/org/apache/storm/process_simulator.clj +++ /dev/null @@ -1,49 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.process-simulator - (:use [org.apache.storm log])) - -(def process-map (atom {})) - -(def kill-lock (Object.)) - -(defn register-process [pid shutdownable] - (swap! process-map assoc pid shutdownable)) - -(defn process-handle - [pid] - (@process-map pid)) - -(defn all-processes - [] - (vals @process-map)) - -(defn kill-process - "Uses `locking` in case cluster shuts down while supervisor is - killing a task" - [pid] - (locking kill-lock - (log-message "Killing process " pid) - (let [shutdownable (process-handle pid)] - (swap! process-map dissoc pid) - (when shutdownable - (.shutdown shutdownable))))) - -(defn kill-all-processes - [] - (doseq [pid (keys @process-map)] - (kill-process pid))) http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index 7817929..80b75f3 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -21,10 +21,10 @@ [common :as common] [worker :as worker] [executor :as executor]]) - (:require [org.apache.storm [process-simulator :as psim]]) (:import [org.apache.commons.io FileUtils] [org.apache.storm.utils] - [org.apache.storm.zookeeper Zookeeper]) + [org.apache.storm.zookeeper Zookeeper] + [org.apache.storm ProcessSimulator]) (:import [java.io File]) (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) @@ -45,13 +45,14 @@ (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor]) (:import [org.apache.storm.tuple Tuple]) (:import [org.apache.storm Thrift]) + (:import [org.apache.storm Config]) (:import [org.apache.storm.generated StormTopology]) (:import [org.apache.storm.task TopologyContext] (org.apache.storm.messaging IContext) [org.json.simple JSONValue]) (:require [org.apache.storm [zookeeper :as zk]]) (:require [org.apache.storm.daemon.acker :as acker]) - (:use [org.apache.storm cluster util config log local-state-converter]) + (:use [org.apache.storm cluster util config log]) (:use [org.apache.storm.internal thrift])) (defn feeder-spout @@ -243,7 +244,7 @@ (.shutdown-all-workers s) ;; race condition here? will it launch the workers again? (supervisor/kill-supervisor s)) - (psim/kill-all-processes) + (ProcessSimulator/killAllProcesses) (if (not-nil? (:zookeeper cluster-map)) (do (log-message "Shutting down in process zookeeper") @@ -285,7 +286,7 @@ ([cluster-map timeout-ms] ;; wait until all workers, supervisors, and nimbus is waiting (let [supervisors @(:supervisors cluster-map) - workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes)) + workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles))) daemons (concat [(:nimbus cluster-map)] supervisors http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java new file mode 100644 index 0000000..7734221 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm; +import org.apache.storm.daemon.Shutdownable; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProcessSimulator { + private static Logger LOG = LoggerFactory.getLogger(ProcessSimulator.class); + protected static Object lock = new Object(); + protected static ConcurrentHashMap<String, Shutdownable> processMap = new ConcurrentHashMap<String, Shutdownable>(); + + /** + * Register a process' handle + * + * @param pid + * @param shutdownable + */ + public static void registerProcess(String pid, Shutdownable shutdownable) { + processMap.put(pid, shutdownable); + } + + /** + * Get a process' handle + * + * @param pid + * @return + */ + protected static Shutdownable getProcessHandle(String pid) { + return processMap.get(pid); + } + + /** + * Get all process handles + * + * @return + */ + public static Collection<Shutdownable> getAllProcessHandles() { + return processMap.values(); + } + + /** + * Kill a process + * + * @param pid + */ + public static void killProcess(String pid) { + synchronized (lock) { + LOG.info("Begin killing process " + pid); + Shutdownable shutdownHandle = getProcessHandle(pid); + if (shutdownHandle != null) { + shutdownHandle.shutdown(); + } + processMap.remove(pid); + LOG.info("Successfully killing process " + pid); + } + } + + /** + * kill all processes + */ + public static void killAllProcesses() { + Set<String> pids = processMap.keySet(); + for (String pid : pids) { + killProcess(pid); + } + LOG.info("Successfully kill all processes"); + } +}
