Merge branch 'master' into supervisor

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/65ce9d2e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/65ce9d2e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/65ce9d2e

Branch: refs/heads/master
Commit: 65ce9d2e03be5f5c4defa8342bfbefe9f59adcf9
Parents: 184dc4a 81fb727
Author: xiaojian.fxj <[email protected]>
Authored: Thu Mar 10 22:57:01 2016 +0800
Committer: xiaojian.fxj <[email protected]>
Committed: Thu Mar 10 22:57:01 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   6 +
 conf/log4j2.xml                                 |   2 +-
 .../travis/print-errors-from-test-reports.py    |   4 +
 .../org/apache/storm/starter/ManualDRPC.java    |  53 ++-
 .../src/clj/org/apache/storm/LocalDRPC.clj      |  56 ---
 .../org/apache/storm/command/kill_workers.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  13 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    | 221 +-----------
 .../apache/storm/daemon/local_supervisor.clj    |   2 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |  27 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 155 ++++----
 storm-core/src/clj/org/apache/storm/testing.clj |   5 +-
 .../clj/org/apache/storm/trident/testing.clj    |   2 -
 storm-core/src/clj/org/apache/storm/ui/core.clj |  81 +++--
 .../src/clj/org/apache/storm/ui/helpers.clj     |  10 +-
 .../src/jvm/org/apache/storm/LocalDRPC.java     |  72 ++++
 .../jvm/org/apache/storm/daemon/DrpcServer.java | 357 +++++++++++++++++++
 .../storm/daemon/supervisor/ShutdownWork.java   | 124 -------
 .../daemon/supervisor/StandaloneSupervisor.java |   1 -
 .../storm/daemon/supervisor/Supervisor.java     |  35 +-
 .../storm/daemon/supervisor/SupervisorData.java |  18 -
 .../daemon/supervisor/SupervisorManger.java     |   3 -
 .../daemon/supervisor/SupervisorUtils.java      |   3 -
 .../daemon/supervisor/SyncProcessEvent.java     |   4 +-
 .../daemon/supervisor/SyncSupervisorEvent.java  |  24 +-
 .../supervisor/timer/SupervisorHealthCheck.java |   1 -
 .../daemon/supervisor/timer/UpdateBlobs.java    |   1 -
 .../storm/metric/StormMetricsRegistry.java      |  86 +++++
 .../auth/AbstractSaslClientCallbackHandler.java |  76 ++++
 .../auth/AbstractSaslServerCallbackHandler.java |  94 +++++
 .../auth/digest/ClientCallbackHandler.java      |  60 +---
 .../auth/digest/ServerCallbackHandler.java      |  61 +---
 .../auth/plain/PlainClientCallbackHandler.java  |  31 ++
 .../auth/plain/PlainSaslTransportPlugin.java    |  71 ++++
 .../auth/plain/PlainServerCallbackHandler.java  |  55 +++
 .../security/auth/plain/SaslPlainServer.java    | 158 ++++++++
 .../test/clj/org/apache/storm/drpc_test.clj     |  27 +-
 .../storm/security/auth/drpc_auth_test.clj      |   5 +-
 38 files changed, 1237 insertions(+), 769 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/command/kill_workers.clj
index 4ddc993,4e713f9..aadc9fd
--- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
+++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj
@@@ -14,10 -14,11 +14,10 @@@
  ;; See the License for the specific language governing permissions and
  ;; limitations under the License.
  (ns org.apache.storm.command.kill-workers
 -  (:import [java.io File])
 +  (:import [java.io File]
-            [org.apache.storm.daemon.supervisor SupervisorUtils 
StandaloneSupervisor SupervisorData ShutdownWork])
++           [org.apache.storm.daemon.supervisor SupervisorUtils 
StandaloneSupervisor SupervisorData])
    (:use [org.apache.storm.daemon common])
    (:use [org.apache.storm util config])
 -  (:require [org.apache.storm.daemon
 -             [supervisor :as supervisor]])
    (:import [org.apache.storm.utils ConfigUtils])
    (:gen-class))
  

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index 2361817,0000000..c8ae2d6
mode 100644,000000..100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@@ -1,60 -1,0 +1,60 @@@
 +;; Licensed to the Apache Software Foundation (ASF) under one
 +;; or more contributor license agreements.  See the NOTICE file
 +;; distributed with this work for additional information
 +;; regarding copyright ownership.  The ASF licenses this file
 +;; to you under the Apache License, Version 2.0 (the
 +;; "License"); you may not use this file except in compliance
 +;; with the License.  You may obtain a copy of the License at
 +;;
 +;; http://www.apache.org/licenses/LICENSE-2.0
 +;;
 +;; Unless required by applicable law or agreed to in writing, software
 +;; distributed under the License is distributed on an "AS IS" BASIS,
 +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +;; See the License for the specific language governing permissions and
 +;; limitations under the License.
 +(ns org.apache.storm.daemon.local-supervisor
-   (:import [org.apache.storm.daemon.supervisor SyncProcessEvent 
SupervisorData ShutdownWork Supervisor SupervisorUtils]
++  (:import [org.apache.storm.daemon.supervisor SyncProcessEvent 
SupervisorData Supervisor SupervisorUtils]
 +           [org.apache.storm.utils Utils ConfigUtils]
 +           [org.apache.storm ProcessSimulator])
 +  (:use [org.apache.storm.daemon common]
 +        [org.apache.storm log])
 +  (:require [org.apache.storm.daemon [worker :as worker] ])
 +  (:require [clojure.string :as str])
 +  (:gen-class))
 +
 +(defn launch-local-worker [supervisorData stormId port workerId resources]
 +  (let [conf (.getConf supervisorData)
 +         pid (Utils/uuid)
 +        worker (worker/mk-worker conf
 +                 (.getSharedContext supervisorData)
 +                 stormId
 +                 (.getAssignmentId supervisorData)
 +                 (int port)
 +                 workerId)]
 +    (ConfigUtils/setWorkerUserWSE conf workerId "")
 +    (ProcessSimulator/registerProcess pid worker)
 +    (.put (.getWorkerThreadPids supervisorData) workerId pid)
 +    ))
 +
 +(defn shutdown-local-worker [supervisorData workerId]
 +    (log-message "shutdown-local-worker")
 +    (SupervisorUtils/shutWorker supervisorData workerId))
 +
 +(defn local-process []
 +  "Create a local process event"
 +  (proxy [SyncProcessEvent] []
 +    (launchWorker [supervisorData stormId port workerId resources]
 +      (launch-local-worker supervisorData stormId port workerId resources))
 +    (shutWorker [supervisorData workerId] (shutdown-local-worker 
supervisorData workerId))))
 +
 +
 +(defserverfn mk-local-supervisor [conf shared-context isupervisor]
 +  (log-message "Starting local Supervisor with conf " conf)
 +  (if (not (ConfigUtils/isLocalMode conf))
 +    (throw
 +      (IllegalArgumentException. "Cannot start server in distrubuted mode!")))
 +  (let [local-process (local-process)
 +        supervisor-server (Supervisor.)]
 +    (.setLocalSyncProcess supervisor-server local-process)
 +    (.mkSupervisor supervisor-server conf shared-context isupervisor)))

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 38ac3ee,ed8d980..a9a3447
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@@ -21,7 -21,7 +21,8 @@@
    (:use [org.apache.storm config util log])
    (:use [org.apache.storm.ui helpers])
    (:import [org.apache.storm StormTimer]
-            [org.apache.storm.daemon.supervisor SupervisorUtils])
++           [org.apache.storm.daemon.supervisor SupervisorUtils]
+            [org.apache.storm.metric StormMetricsRegistry])
    (:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils])
    (:import [org.slf4j LoggerFactory])
    (:import [java.util Arrays ArrayList HashSet])

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index 1bd54f7,66fc051..dd1f2df
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -24,8 -24,7 +24,8 @@@
    (:import [org.apache.commons.io FileUtils]
             [org.apache.storm.utils]
             [org.apache.storm.zookeeper Zookeeper]
 -           [org.apache.storm ProcessSimulator])
 +           [org.apache.storm ProcessSimulator]
-            [org.apache.storm.daemon.supervisor StandaloneSupervisor 
SupervisorData ShutdownWork SupervisorManger])
++           [org.apache.storm.daemon.supervisor StandaloneSupervisor 
SupervisorData SupervisorManger SupervisorUtils])
    (:import [java.io File])
    (:import [java.util HashMap ArrayList])
    (:import [java.util.concurrent.atomic AtomicInteger])
@@@ -412,15 -406,14 +412,14 @@@
  
  (defn mk-capture-shutdown-fn
    [capture-atom]
-   (let [shut-down (ShutdownWork.)]
 -  (let [existing-fn supervisor/shutdown-worker]
 -    (fn [supervisor worker-id]
 -      (let [conf (:conf supervisor)
 -            supervisor-id (:supervisor-id supervisor)
 -            port (find-worker-port conf worker-id)
 +    (fn [supervisorData workerId]
 +      (let [conf (.getConf supervisorData)
 +            supervisor-id (.getSupervisorId supervisorData)
 +            port (find-worker-port conf workerId)
              existing (get @capture-atom [supervisor-id port] 0)]
 +        (log-message "mk-capture-shutdown-fn")
          (swap! capture-atom assoc [supervisor-id port] (inc existing))
-         (.shutWorker shut-down supervisorData workerId)))))
 -        (existing-fn supervisor worker-id)))))
++        (SupervisorUtils/shutWorker supervisorData workerId))))
  
  (defmacro capture-changed-workers
    [& body]

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
index d4ce623,0000000..4947c6f
mode 100644,000000..100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@@ -1,85 -1,0 +1,84 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.storm.Config;
 +import org.apache.storm.scheduler.ISupervisor;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Utils;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Map;
- import java.util.UUID;
 +
 +public class StandaloneSupervisor implements ISupervisor {
 +    private String supervisorId;
 +    private Map conf;
 +
 +    @Override
 +    public void prepare(Map stormConf, String schedulerLocalDir) {
 +        try {
 +            LocalState localState = new LocalState(schedulerLocalDir);
 +            String supervisorId = localState.getSupervisorId();
 +            if (supervisorId == null) {
 +                supervisorId = generateSupervisorId();
 +                localState.setSupervisorId(supervisorId);
 +            }
 +            this.conf = stormConf;
 +            this.supervisorId = supervisorId;
 +        } catch (IOException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    @Override
 +    public String getSupervisorId() {
 +        return supervisorId;
 +    }
 +
 +    @Override
 +    public String getAssignmentId() {
 +        return supervisorId;
 +    }
 +
 +    @Override
 +    // @return is vector which need be converted to be int
 +    public Object getMetadata() {
 +        Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS);
 +        return ports;
 +    }
 +
 +    @Override
 +    public boolean confirmAssigned(int port) {
 +        return true;
 +    }
 +
 +    @Override
 +    public void killedWorker(int port) {
 +
 +    }
 +
 +    @Override
 +    public void assigned(Collection<Integer> ports) {
 +
 +    }
 +
 +    public String generateSupervisorId(){
 +        return Utils.uuid();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 847b38d,0000000..6124aef
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@@ -1,195 -1,0 +1,178 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
- import com.codahale.metrics.Gauge;
- import com.codahale.metrics.MetricRegistry;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.StormTimer;
- import org.apache.storm.daemon.metrics.MetricsUtils;
- import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
 +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
 +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
 +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
 +import org.apache.storm.event.EventManagerImp;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.messaging.IContext;
++import org.apache.storm.metric.StormMetricsRegistry;
 +import org.apache.storm.scheduler.ISupervisor;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Utils;
 +import org.apache.storm.utils.VersionInfo;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.InterruptedIOException;
 +import java.util.Collection;
- import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
++import java.util.concurrent.Callable;
 +
 +public class Supervisor {
 +    private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
 +
-     //TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
++    // TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
 +    private SyncProcessEvent localSyncProcess;
 +
 +    public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
 +        this.localSyncProcess = localSyncProcess;
 +    }
 +
- 
 +    /**
 +     * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
 +     * 
 +     * @param conf
 +     * @param sharedContext
 +     * @param iSupervisor
 +     * @return
 +     * @throws Exception
 +     */
 +    public SupervisorManger mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
 +        SupervisorManger supervisorManger = null;
 +        try {
 +            LOG.info("Starting Supervisor with conf {}", conf);
 +            iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
 +            String path = ConfigUtils.supervisorTmpDir(conf);
 +            FileUtils.cleanDirectory(new File(path));
 +
 +            final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
 +            Localizer localizer = supervisorData.getLocalizer();
 +
 +            SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
 +            hb.run();
 +            // should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
 +            Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
 +            supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
 +
 +            Set<String> downdedStormId = 
SupervisorUtils.readDownLoadedStormIds(conf);
 +            for (String stormId : downdedStormId) {
 +                SupervisorUtils.addBlobReferences(localizer, stormId, conf);
 +            }
 +            // do this after adding the references so we don't try to clean 
things being used
 +            localizer.startCleaner();
 +
 +            EventManagerImp syncSupEventManager = new EventManagerImp(false);
 +            EventManagerImp syncProcessManager = new EventManagerImp(false);
 +
 +            SyncProcessEvent syncProcessEvent = null;
-             if (ConfigUtils.isLocalMode(conf)){
++            if (ConfigUtils.isLocalMode(conf)) {
 +                localSyncProcess.init(supervisorData);
 +                syncProcessEvent = localSyncProcess;
-             }else{
++            } else {
 +                syncProcessEvent = new SyncProcessEvent(supervisorData);
 +            }
 +
 +            SyncSupervisorEvent syncSupervisorEvent = new 
SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, 
syncProcessManager);
 +            UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
 +            RunProfilerActions runProfilerActionThread = new 
RunProfilerActions(supervisorData);
 +
 +            if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
 +                StormTimer eventTimer = supervisorData.getEventTimer();
 +                // This isn't strictly necessary, but it doesn't hurt and 
ensures that the machine stays up
 +                // to date even if callbacks don't all work exactly right
 +                eventTimer.scheduleRecurring(0, 10, new 
EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
 +
 +                eventTimer.scheduleRecurring(0, 
Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)),
 +                        new EventManagerPushCallback(syncProcessEvent, 
syncProcessManager));
 +
 +                // Blob update thread. Starts with 30 seconds delay, every 30 
seconds
 +                supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, 
new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
 +
 +                // supervisor health check
 +                eventTimer.scheduleRecurring(300, 300, new 
SupervisorHealthCheck(supervisorData));
 +
 +                // Launch a thread that Runs profiler commands . Starts with 
30 seconds delay, every 30 seconds
 +                eventTimer.scheduleRecurring(30, 30, new 
EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
 +            }
-             LOG.info("Starting supervisor with id {} at host {}.", 
supervisorData.getSupervisorId(), supervisorData.getHostName() );
++            LOG.info("Starting supervisor with id {} at host {}.", 
supervisorData.getSupervisorId(), supervisorData.getHostName());
 +            supervisorManger = new SupervisorManger(supervisorData, 
syncSupEventManager, syncProcessManager);
 +        } catch (Throwable t) {
 +            if 
(Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
 +                throw t;
 +            } else if 
(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
 +                throw t;
 +            } else {
 +                LOG.error("Error on initialization of server supervisor: {}", 
t);
 +                Utils.exitProcess(13, "Error on initialization");
 +            }
 +        }
 +        return supervisorManger;
 +    }
 +
 +    /**
 +     * start distribute supervisor
 +     */
 +    private void launch(ISupervisor iSupervisor) {
 +        LOG.info("Starting supervisor for storm version '{}'.", 
VersionInfo.getVersion());
 +        SupervisorManger supervisorManager;
 +        try {
 +            Map<Object, Object> conf = Utils.readStormConfig();
 +            if (ConfigUtils.isLocalMode(conf)) {
 +                throw new IllegalArgumentException("Cannot start server in 
local mode!");
 +            }
 +            supervisorManager = mkSupervisor(conf, null, iSupervisor);
 +            if (supervisorManager != null)
 +                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
 +            registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
-             startMetricsReporters(conf);
++            StormMetricsRegistry.startMetricsReporters(conf);
 +        } catch (Exception e) {
 +            LOG.error("Failed to start supervisor\n", e);
 +            System.exit(1);
 +        }
 +    }
 +
-     // To be removed
 +    private void registerWorkerNumGauge(String name, final Map conf) {
-         MetricRegistry metricRegistry = new MetricRegistry();
-         metricRegistry.remove(name);
-         metricRegistry.register(name, new Gauge<Integer>() {
++        StormMetricsRegistry.registerGauge("supervisor:num-slots-used-gauge", 
new Callable<Integer>() {
 +            @Override
-             public Integer getValue() {
++            public Integer call() throws Exception {
 +                Collection<String> pids = 
SupervisorUtils.supervisorWorkerIds(conf);
 +                return pids.size();
 +            }
 +        });
 +    }
 +
-     // To be removed
-     private void startMetricsReporters(Map conf) {
-         List<PreparableReporter> preparableReporters = 
MetricsUtils.getPreparableReporters(conf);
-         for (PreparableReporter reporter : preparableReporters) {
-             reporter.prepare(new MetricRegistry(), conf);
-             reporter.start();
-         }
-         LOG.info("Started statistics report plugin...");
-     }
- 
 +    /**
 +     * supervisor daemon enter entrance
 +     *
 +     * @param args
 +     */
 +    public static void main(String[] args) {
 +        Utils.setupDefaultUncaughtExceptionHandler();
 +        Supervisor instance = new Supervisor();
 +        instance.launch(new StandaloneSupervisor());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
index be39b4e,0000000..be79847
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@@ -1,267 -1,0 +1,249 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.storm.Config;
 +import org.apache.storm.StormTimer;
 +import org.apache.storm.cluster.ClusterStateContext;
 +import org.apache.storm.cluster.ClusterUtils;
 +import org.apache.storm.cluster.DaemonType;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.container.cgroup.CgroupManager;
 +import org.apache.storm.generated.LocalAssignment;
 +import org.apache.storm.generated.ProfileRequest;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.messaging.IContext;
 +import org.apache.storm.scheduler.ISupervisor;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Utils;
 +import org.apache.storm.utils.VersionInfo;
- import org.apache.zookeeper.ZooDefs;
 +import org.apache.zookeeper.data.ACL;
 +import org.eclipse.jetty.util.ConcurrentHashSet;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.net.UnknownHostException;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +public class SupervisorData {
 +
 +    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorData.class);
 +
 +    private final Map conf;
 +    private final IContext sharedContext;
 +    private volatile boolean active;
 +    private ISupervisor iSupervisor;
 +    private Utils.UptimeComputer upTime;
 +    private String stormVersion;
- 
 +    private ConcurrentHashMap<String, String> workerThreadPids; // for local 
mode
- 
 +    private IStormClusterState stormClusterState;
- 
 +    private LocalState localState;
- 
 +    private String supervisorId;
- 
 +    private String assignmentId;
- 
 +    private String hostName;
- 
 +    // used for reporting used ports when heartbeating
 +    private AtomicReference<Map<Long, LocalAssignment>> currAssignment;
- 
 +    private StormTimer heartbeatTimer;
- 
 +    private StormTimer eventTimer;
- 
 +    private StormTimer blobUpdateTimer;
- 
 +    private Localizer localizer;
- 
 +    private AtomicReference<Map<String, Map<String, Object>>> 
assignmentVersions;
- 
 +    private AtomicInteger syncRetry;
- 
 +    private final Object downloadLock = new Object();
- 
 +    private AtomicReference<Map<String, List<ProfileRequest>>> 
stormIdToProfileActions;
- 
 +    private CgroupManager resourceIsolationManager;
- 
 +    private ConcurrentHashSet<String> deadWorkers;
 +
 +    public SupervisorData(Map conf, IContext sharedContext, ISupervisor 
iSupervisor) {
 +        this.conf = conf;
 +        this.sharedContext = sharedContext;
 +        this.iSupervisor = iSupervisor;
 +        this.active = true;
 +        this.upTime = Utils.makeUptimeComputer();
 +        this.stormVersion = VersionInfo.getVersion();
 +        this.workerThreadPids = new ConcurrentHashMap<String, String>();
 +        this.deadWorkers = new ConcurrentHashSet();
 +
 +        List<ACL> acls = null;
 +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
 +            acls = SupervisorUtils.supervisorZkAcls();
 +        }
 +
 +        try {
 +            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, 
acls, new ClusterStateContext(DaemonType.SUPERVISOR));
 +        } catch (Exception e) {
 +            LOG.error("supervisor can't create stormClusterState");
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +        try {
 +            this.localState = ConfigUtils.supervisorState(conf);
 +            this.localizer = Utils.createLocalizer(conf, 
ConfigUtils.supervisorLocalDir(conf));
 +        } catch (IOException e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +        this.supervisorId = iSupervisor.getSupervisorId();
 +        this.assignmentId = iSupervisor.getAssignmentId();
 +
 +        try {
 +            this.hostName = Utils.hostname(conf);
 +        } catch (UnknownHostException e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +        this.currAssignment = new AtomicReference<Map<Long, 
LocalAssignment>>(new HashMap<Long,LocalAssignment>());
 +
 +        this.heartbeatTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
 +
 +        this.eventTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
 +
 +        this.blobUpdateTimer = new StormTimer("blob-update-timer", new 
DefaultUncaughtExceptionHandler());
 +
 +        this.assignmentVersions = new AtomicReference<Map<String, Map<String, 
Object>>>(new HashMap<String, Map<String, Object>>());
 +        this.syncRetry = new AtomicInteger(0);
 +        this.stormIdToProfileActions = new AtomicReference<Map<String, 
List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>());
 +        if 
(Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), 
false)) {
 +            try {
 +                this.resourceIsolationManager = (CgroupManager) 
Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
 +                this.resourceIsolationManager.prepare(conf);
 +                LOG.info("Using resource isolation plugin {} {}", 
conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
 +            } catch (IOException e) {
 +                throw Utils.wrapInRuntime(e);
 +            }
 +        } else {
 +            this.resourceIsolationManager = null;
 +        }
 +    }
 +
 +    public AtomicReference<Map<String, List<ProfileRequest>>> 
getStormIdToProfileActions() {
 +        return stormIdToProfileActions;
 +    }
 +
 +    public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> 
stormIdToProfileActions) {
 +        this.stormIdToProfileActions.set(stormIdToProfileActions);
 +    }
 +
 +    public Map getConf() {
 +        return conf;
 +    }
 +
 +    public IContext getSharedContext() {
 +        return sharedContext;
 +    }
 +
 +    public boolean isActive() {
 +        return active;
 +    }
 +
 +    public void setActive(boolean active) {
 +        this.active = active;
 +    }
 +
 +    public ISupervisor getiSupervisor() {
 +        return iSupervisor;
 +    }
 +
 +    public Utils.UptimeComputer getUpTime() {
 +        return upTime;
 +    }
 +
 +    public String getStormVersion() {
 +        return stormVersion;
 +    }
 +
 +    public ConcurrentHashMap<String, String> getWorkerThreadPids() {
 +        return workerThreadPids;
 +    }
 +
 +    public IStormClusterState getStormClusterState() {
 +        return stormClusterState;
 +    }
 +
 +    public LocalState getLocalState() {
 +        return localState;
 +    }
 +
 +    public String getSupervisorId() {
 +        return supervisorId;
 +    }
 +
 +    public String getAssignmentId() {
 +        return assignmentId;
 +    }
 +
 +    public String getHostName() {
 +        return hostName;
 +    }
 +
 +    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
 +        return currAssignment;
 +    }
 +
 +    public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) {
 +        this.currAssignment.set(currAssignment);
 +    }
 +
 +    public StormTimer getHeartbeatTimer() {
 +        return heartbeatTimer;
 +    }
 +
 +    public StormTimer getEventTimer() {
 +        return eventTimer;
 +    }
 +
 +    public StormTimer getBlobUpdateTimer() {
 +        return blobUpdateTimer;
 +    }
 +
 +    public Localizer getLocalizer() {
 +        return localizer;
 +    }
 +
 +    public void setLocalizer(Localizer localizer) {
 +        this.localizer = localizer;
 +    }
 +
 +    public AtomicInteger getSyncRetry() {
 +        return syncRetry;
 +    }
 +
 +    public AtomicReference<Map<String, Map<String, Object>>> 
getAssignmentVersions() {
 +        return assignmentVersions;
 +    }
 +
 +    public void setAssignmentVersions(Map<String, Map<String, Object>> 
assignmentVersions) {
 +        this.assignmentVersions.set(assignmentVersions);
 +    }
 +
 +    public CgroupManager getResourceIsolationManager() {
 +        return resourceIsolationManager;
 +    }
 +
 +    public ConcurrentHashSet getDeadWorkers() {
 +        return deadWorkers;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
index 6578529,0000000..26f0aae
mode 100644,000000..100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java
@@@ -1,100 -1,0 +1,97 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.storm.event.EventManager;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.Collection;
 +import java.util.Map;
 +
 +public class SupervisorManger implements SupervisorDaemon, DaemonCommon, 
Runnable {
 +
 +    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorManger.class);
- 
 +    private final EventManager eventManager;
- 
 +    private final EventManager processesEventManager;
- 
 +    private SupervisorData supervisorData;
 +
 +    public SupervisorManger(SupervisorData supervisorData, EventManager 
eventManager, EventManager processesEventManager) {
 +        this.eventManager = eventManager;
 +        this.supervisorData = supervisorData;
 +        this.processesEventManager = processesEventManager;
 +    }
 +
 +    public void shutdown() {
 +        LOG.info("Shutting down supervisor{}", 
supervisorData.getSupervisorId());
 +        supervisorData.setActive(false);
 +        try {
 +            supervisorData.getHeartbeatTimer().close();
 +            supervisorData.getEventTimer().close();
 +            supervisorData.getBlobUpdateTimer().close();
 +            eventManager.close();
 +            processesEventManager.close();
 +        } catch (Exception e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +        supervisorData.getStormClusterState().disconnect();
 +    }
 +
 +    @Override
 +    public void shutdownAllWorkers() {
 +
 +        Collection<String> workerIds = 
SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
 +        try {
 +            for (String workerId : workerIds) {
 +                SupervisorUtils.shutWorker(supervisorData, workerId);
 +            }
 +        } catch (Exception e) {
 +            LOG.error("shutWorker failed");
 +            throw Utils.wrapInRuntime(e);
 +        }
 +    }
 +
 +    @Override
 +    public Map getConf() {
 +        return supervisorData.getConf();
 +    }
 +
 +    @Override
 +    public String getId() {
 +        return supervisorData.getSupervisorId();
 +    }
 +
 +    @Override
 +    public boolean isWaiting() {
 +        if (!supervisorData.isActive()) {
 +            return true;
 +        }
 +
 +        if (supervisorData.getHeartbeatTimer().isTimerWaiting() && 
supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
 +                && processesEventManager.waiting()) {
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    public void run() {
 +        shutdown();
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index dd2a538,0000000..ae3422e
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@@ -1,356 -1,0 +1,353 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.lang.StringUtils;
- import org.apache.curator.utils.PathUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.ProcessSimulator;
 +import org.apache.storm.generated.LSWorkerHeartbeat;
 +import org.apache.storm.localizer.LocalResource;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.apache.zookeeper.ZooDefs;
 +import org.apache.zookeeper.data.ACL;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.net.URLDecoder;
 +import java.util.*;
 +
 +public class SupervisorUtils {
 +
 +    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
 +
 +    private static final SupervisorUtils INSTANCE = new SupervisorUtils();
 +    private static SupervisorUtils _instance = INSTANCE;
- 
 +    public static void setInstance(SupervisorUtils u) {
 +        _instance = u;
 +    }
- 
 +    public static void resetInstance() {
 +        _instance = INSTANCE;
 +    }
 +
 +    public static Process workerLauncher(Map conf, String user, List<String> 
args, Map<String, String> environment, final String logPreFix,
 +            final Utils.ExitCodeCallable exitCodeCallback, File dir) throws 
IOException {
 +        if (StringUtils.isBlank(user)) {
 +            throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
 +        }
 +        String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
 +        String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
 +        String wl;
 +        if (StringUtils.isNotBlank(wlinitial)) {
 +            wl = wlinitial;
 +        } else {
 +            wl = stormHome + "/bin/worker-launcher";
 +        }
 +        List<String> commands = new ArrayList<>();
 +        commands.add(wl);
 +        commands.add(user);
 +        commands.addAll(args);
 +        LOG.info("Running as user: {} command: {}", user, commands);
 +        return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
 +    }
 +
 +    public static int workerLauncherAndWait(Map conf, String user, 
List<String> args, final Map<String, String> environment, final String 
logPreFix)
 +            throws IOException {
 +        int ret = 0;
 +        Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
 +        if (StringUtils.isNotBlank(logPreFix))
 +            Utils.readAndLogStream(logPreFix, process.getInputStream());
 +        try {
 +            process.waitFor();
 +        } catch (InterruptedException e) {
 +            LOG.info("{} interrupted.", logPreFix);
 +        }
 +        ret = process.exitValue();
 +        return ret;
 +    }
 +
 +    public static void setupStormCodeDir(Map conf, Map stormConf, String dir) 
throws IOException {
 +        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
 +            String logPrefix = "setup conf for " + dir;
 +            List<String> commands = new ArrayList<>();
 +            commands.add("code-dir");
 +            commands.add(dir);
 +            workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
 +        }
 +    }
 +
 +    public static void rmrAsUser(Map conf, String id, String path) throws 
IOException {
 +        String user = Utils.getFileOwner(path);
 +        String logPreFix = "rmr " + id;
 +        List<String> commands = new ArrayList<>();
 +        commands.add("rmr");
 +        commands.add(path);
 +        SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, 
logPreFix);
 +        if (Utils.checkFileExists(path)) {
 +            throw new RuntimeException(path + " was not deleted.");
 +        }
 +    }
 +
 +    /**
 +     * Given the blob information returns the value of the uncompress field, 
handling it either being a string or a boolean value, or if it's not specified 
then
 +     * returns false
 +     * 
 +     * @param blobInfo
 +     * @return
 +     */
 +    public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
 +        return new Boolean((String) blobInfo.get("uncompress"));
 +    }
 +
 +    /**
 +     * Remove a reference to a blob when its no longer needed
 +     * 
 +     * @param blobstoreMap
 +     * @return
 +     */
 +    public static List<LocalResource> 
blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
 +        List<LocalResource> localResourceList = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> map : 
blobstoreMap.entrySet()) {
 +                LocalResource localResource = new LocalResource(map.getKey(), 
shouldUncompressBlob(map.getValue()));
 +                localResourceList.add(localResource);
 +            }
 +        }
 +        return localResourceList;
 +    }
 +
 +    /**
 +     * For each of the downloaded topologies, adds references to the blobs 
that the topologies are using. This is used to reconstruct the cache on restart.
 +     * 
 +     * @param localizer
 +     * @param stormId
 +     * @param conf
 +     */
 +    public static void addBlobReferences(Localizer localizer, String stormId, 
Map conf) throws IOException {
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
 +        List<LocalResource> localresources = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
 +        if (blobstoreMap != null) {
 +            localizer.addReferences(localresources, user, topoName);
 +        }
 +    }
 +
 +    public static Set<String> readDownLoadedStormIds(Map conf) throws 
IOException {
 +        Set<String> stormIds = new HashSet<>();
 +        String path = ConfigUtils.supervisorStormDistRoot(conf);
 +        Collection<String> rets = Utils.readDirContents(path);
 +        for (String ret : rets) {
 +            stormIds.add(URLDecoder.decode(ret));
 +        }
 +        return stormIds;
 +    }
 +
 +    public static Collection<String> supervisorWorkerIds(Map conf) {
 +        String workerRoot = ConfigUtils.workerRoot(conf);
 +        return Utils.readDirContents(workerRoot);
 +    }
 +
 +    public static boolean doRequiredTopoFilesExist(Map conf, String stormId) 
throws IOException {
 +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
 +        String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
 +        String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
 +        if (!Utils.checkFileExists(stormroot))
 +            return false;
 +        if (!Utils.checkFileExists(stormcodepath))
 +            return false;
 +        if (!Utils.checkFileExists(stormconfpath))
 +            return false;
 +        if (ConfigUtils.isLocalMode(conf) || 
Utils.checkFileExists(stormjarpath))
 +            return true;
 +        return false;
 +    }
 +
 +    /**
 +     * Returns map from worr id to heartbeat
 +     *
 +     * @param conf
 +     * @return
 +     * @throws Exception
 +     */
 +    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map 
conf) throws Exception {
 +        return _instance.readWorkerHeartbeatsImpl(conf);
 +    }
 +
 +    public  Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) 
throws Exception {
 +        Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 +
 +        Collection<String> workerIds = 
SupervisorUtils.supervisorWorkerIds(conf);
 +
 +        for (String workerId : workerIds) {
 +            LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
 +            // ATTENTION: whb can be null
 +            workerHeartbeats.put(workerId, whb);
 +        }
 +        return workerHeartbeats;
 +    }
 +
 +
 +    /**
 +     * get worker heartbeat by workerId
 +     *
 +     * @param conf
 +     * @param workerId
 +     * @return
 +     * @throws IOException
 +     */
 +    public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String 
workerId) {
 +        return _instance.readWorkerHeartbeatImpl(conf, workerId);
 +    }
 +
 +    public  LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String 
workerId) {
 +        try {
 +            LocalState localState = ConfigUtils.workerState(conf, workerId);
 +            return localState.getWorkerHeartBeat();
 +        } catch (Exception e) {
 +            LOG.warn("Failed to read local heartbeat for workerId : 
{},Ignoring exception.", workerId, e);
 +            return null;
 +        }
 +    }
 +
 +    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, 
Map conf) {
 +        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
 +    }
 +
 +    public  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, 
Map conf) {
 +        boolean result = false;
 +        if ((now - whb.get_time_secs()) > 
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
 +            result = true;
 +        }
 +        return result;
 +    }
 +
 +    public static String javaCmd(String cmd) {
 +        return _instance.javaCmdImpl(cmd);
 +    }
 +
 +    public String javaCmdImpl(String cmd) {
 +        String ret = null;
 +        String javaHome = System.getenv().get("JAVA_HOME");
 +        if (StringUtils.isNotBlank(javaHome)) {
 +            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + 
Utils.FILE_PATH_SEPARATOR + cmd;
 +        } else {
 +            ret = cmd;
 +        }
 +        return ret;
 +    }
 +    
 +    public final static List<ACL> supervisorZkAcls() {
 +        final List<ACL> acls = new ArrayList<>();
 +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
 +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), 
ZooDefs.Ids.ANYONE_ID_UNSAFE));
 +        return acls;
 +    }
 +
 +    public static void shutWorker(SupervisorData supervisorData, String 
workerId) throws IOException, InterruptedException {
 +        LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), 
workerId);
 +        Map conf = supervisorData.getConf();
 +        Collection<String> pids = 
Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
 +        Integer shutdownSleepSecs = 
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
 +        Boolean asUser = 
Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
 +        String user = ConfigUtils.getWorkerUser(conf, workerId);
 +        String threadPid = supervisorData.getWorkerThreadPids().get(workerId);
 +        if (StringUtils.isNotBlank(threadPid)) {
 +            ProcessSimulator.killProcess(threadPid);
 +        }
 +
 +        for (String pid : pids) {
 +            if (asUser) {
 +                List<String> commands = new ArrayList<>();
 +                commands.add("signal");
 +                commands.add(pid);
 +                commands.add("15");
 +                String logPrefix = "kill -15 " + pid;
 +                SupervisorUtils.workerLauncherAndWait(conf, user, commands, 
null, logPrefix);
 +            } else {
 +                Utils.killProcessWithSigTerm(pid);
 +            }
 +        }
 +
 +        if (pids.size() > 0) {
 +            LOG.info("Sleep {} seconds for execution of cleanup threads on 
worker.", shutdownSleepSecs);
 +            Time.sleepSecs(shutdownSleepSecs);
 +        }
 +
 +        for (String pid : pids) {
 +            if (asUser) {
 +                List<String> commands = new ArrayList<>();
 +                commands.add("signal");
 +                commands.add(pid);
 +                commands.add("9");
 +                String logPrefix = "kill -9 " + pid;
 +                SupervisorUtils.workerLauncherAndWait(conf, user, commands, 
null, logPrefix);
 +            } else {
 +                Utils.forceKillProcess(pid);
 +            }
 +            String path = ConfigUtils.workerPidPath(conf, workerId, pid);
 +            if (asUser) {
 +                SupervisorUtils.rmrAsUser(conf, workerId, path);
 +            } else {
 +                try {
 +                    LOG.debug("Removing path {}", path);
 +                    new File(path).delete();
 +                } catch (Exception e) {
 +                    // on windows, the supervisor may still holds the lock on 
the worker directory
 +                    // ignore
 +                }
 +            }
 +        }
 +        tryCleanupWorker(conf, supervisorData, workerId);
 +        LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), 
workerId);
 +
 +    }
 +
 +    public static void tryCleanupWorker(Map conf, SupervisorData 
supervisorData, String workerId) {
 +        try {
 +            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
 +            if (Utils.checkFileExists(workerRoot)) {
 +                if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
 +                } else {
 +                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, 
workerId));
 +                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, 
workerId));
 +                    Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, 
workerId));
 +                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
 +                }
 +                ConfigUtils.removeWorkerUserWSE(conf, workerId);
 +                supervisorData.getDeadWorkers().remove(workerId);
 +            }
 +            if 
(Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), 
false)){
 +                
supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId);
 +            }
 +        } catch (IOException e) {
 +            LOG.warn("Failed to cleanup worker {}. Will retry later", 
workerId, e);
 +        } catch (RuntimeException e) {
 +            LOG.warn("Failed to cleanup worker {}. Will retry later", 
workerId, e);
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index cf26896,0000000..068c442
mode 100644,000000..100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@@ -1,674 -1,0 +1,672 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.container.cgroup.CgroupManager;
 +import org.apache.storm.generated.ExecutorInfo;
 +import org.apache.storm.generated.LSWorkerHeartbeat;
 +import org.apache.storm.generated.LocalAssignment;
 +import org.apache.storm.generated.WorkerResources;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.eclipse.jetty.util.ConcurrentHashSet;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.yaml.snakeyaml.Yaml;
 +
 +import java.io.File;
 +import java.io.FileWriter;
 +import java.io.IOException;
 +import java.util.*;
 +
 +/**
 + * 1. to kill are those in allocated that are dead or disallowed 2. kill the 
ones that should be dead - read pids, kill -9 and individually remove file - rmr
 + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of 
the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
 + * ids, write new "approved workers" to LS 5. create local dir for worker id 
5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
 + * launch
 + */
 +public class SyncProcessEvent implements Runnable {
 +
 +    private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
 +
 +    private  LocalState localState;
- 
-     private SupervisorData supervisorData;
- 
++    private  SupervisorData supervisorData;
 +    public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
 +
 +    private class ProcessExitCallback implements Utils.ExitCodeCallable {
 +        private final String logPrefix;
 +        private final String workerId;
 +
 +        public ProcessExitCallback(String logPrefix, String workerId) {
 +            this.logPrefix = logPrefix;
 +            this.workerId = workerId;
 +        }
 +
 +        @Override
 +        public Object call() throws Exception {
 +            return null;
 +        }
 +
 +        @Override
 +        public Object call(int exitCode) {
 +            LOG.info("{} exited with code: {}", logPrefix, exitCode);
 +            supervisorData.getDeadWorkers().add(workerId);
 +            return null;
 +        }
 +    }
 +
 +    public SyncProcessEvent(){
 +
 +    }
 +    public SyncProcessEvent(SupervisorData supervisorData) {
 +        init(supervisorData);
 +    }
 +
 +    //TODO: initData is intended to local supervisor, so we will remove them 
after porting worker.clj to java
 +    public void init(SupervisorData supervisorData){
 +        this.supervisorData = supervisorData;
 +        this.localState = supervisorData.getLocalState();
 +    }
 +
 +
 +    /**
 +     * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file -
 +     * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
 +     * worker ids, write new "approved workers" to LS 5. create local dir for 
worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. 
wait
 +     * for workers launch
 +     */
 +    @Override
 +    public void run() {
 +        LOG.debug("Syncing processes");
 +        try {
 +            Map conf = supervisorData.getConf();
 +            Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
 +
 +            if (assignedExecutors == null) {
 +                assignedExecutors = new HashMap<>();
 +            }
 +            int now = Time.currentTimeSecs();
 +
 +            Map<String, StateHeartbeat> localWorkerStats = 
getLocalWorkerStats(supervisorData, assignedExecutors, now);
 +
 +            Set<String> keeperWorkerIds = new HashSet<>();
 +            Set<Integer> keepPorts = new HashSet<>();
 +            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
 +                StateHeartbeat stateHeartbeat = entry.getValue();
 +                if (stateHeartbeat.getState() == State.VALID) {
 +                    keeperWorkerIds.add(entry.getKey());
 +                    keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
 +                }
 +            }
 +            Map<Integer, LocalAssignment> reassignExecutors = 
getReassignExecutors(assignedExecutors, keepPorts);
 +            Map<Integer, String> newWorkerIds = new HashMap<>();
 +            for (Integer port : reassignExecutors.keySet()) {
 +                newWorkerIds.put(port, Utils.uuid());
 +            }
 +            LOG.debug("Syncing processes");
 +            LOG.debug("Assigned executors: {}", assignedExecutors);
 +            LOG.debug("Allocated: {}", localWorkerStats);
 +
 +            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
 +                StateHeartbeat stateHeartbeat = entry.getValue();
 +                if (stateHeartbeat.getState() != State.VALID) {
 +                    LOG.info("Shutting down and clearing state for id {}, 
Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
 +                            stateHeartbeat.getState(), 
stateHeartbeat.getHeartbeat());
 +                    shutWorker(supervisorData, entry.getKey());
 +                }
 +            }
 +            // start new workers
 +            Map<String, Integer> newWorkerPortToIds = 
startNewWorkers(newWorkerIds, reassignExecutors);
 +
 +            Map<String, Integer> allWorkerPortToIds = new HashMap<>();
 +            Map<String, Integer> approvedWorkers = 
localState.getApprovedWorkers();
 +            for (String keeper : keeperWorkerIds) {
 +                allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper));
 +            }
 +            allWorkerPortToIds.putAll(newWorkerPortToIds);
 +            localState.setApprovedWorkers(allWorkerPortToIds);
 +            waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());
 +
 +        } catch (Exception e) {
 +            LOG.error("Failed Sync Process", e);
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +    }
 +
 +    protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) 
throws Exception {
 +        int startTime = Time.currentTimeSecs();
 +        int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
 +        for (String workerId : workerIds) {
 +            LocalState localState = ConfigUtils.workerState(conf, workerId);
 +            while (true) {
 +                LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
 +                if (hb != null || (Time.currentTimeSecs() - startTime) > 
timeOut)
 +                    break;
 +                LOG.info("{} still hasn't started", workerId);
 +                Time.sleep(500);
 +            }
 +            if (localState.getWorkerHeartBeat() == null) {
 +                LOG.info("Worker {} failed to start", workerId);
 +            }
 +        }
 +    }
 +
 +    protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, 
LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
 +        Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
 +        reassignExecutors.putAll(assignExecutors);
 +        for (Integer port : keepPorts) {
 +            reassignExecutors.remove(port);
 +        }
 +        return reassignExecutors;
 +    }
 +    
 +    /**
 +     * Returns map from worker id to worker heartbeat. if the heartbeat is 
nil, then the worker is dead
 +     * 
 +     * @param assignedExecutors
 +     * @return
 +     * @throws Exception
 +     */
 +    public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData 
supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) 
throws Exception {
 +        Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
 +        Map conf = supervisorData.getConf();
 +        LocalState localState = supervisorData.getLocalState();
 +        Map<String, LSWorkerHeartbeat> idToHeartbeat = 
SupervisorUtils.readWorkerHeartbeats(conf);
 +        Map<String, Integer> approvedWorkers = 
localState.getApprovedWorkers();
 +        Set<String> approvedIds = new HashSet<>();
 +        if (approvedWorkers != null) {
 +            approvedIds.addAll(approvedWorkers.keySet());
 +        }
 +        for (Map.Entry<String, LSWorkerHeartbeat> entry : 
idToHeartbeat.entrySet()) {
 +            String workerId = entry.getKey();
 +            LSWorkerHeartbeat whb = entry.getValue();
 +            State state;
 +            if (whb == null) {
 +                state = State.NOT_STARTED;
 +            } else if (!approvedIds.contains(workerId) || 
!matchesAssignment(whb, assignedExecutors)) {
 +                state = State.DISALLOWED;
 +            } else if (supervisorData.getDeadWorkers().contains(workerId)) {
 +                LOG.info("Worker Process {} has died", workerId);
 +                state = State.TIMED_OUT;
 +            } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
 +                state = State.TIMED_OUT;
 +            } else {
 +                state = State.VALID;
 +            }
 +            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor 
time-secs {}", workerId, state, whb, now);
 +            workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
 +        }
 +        return workerIdHbstate;
 +    }
 +
 +    protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, 
LocalAssignment> assignedExecutors) {
 +        LocalAssignment localAssignment = 
assignedExecutors.get(whb.get_port());
 +        if (localAssignment == null || 
!localAssignment.get_topology_id().equals(whb.get_topology_id())) {
 +            return false;
 +        }
 +        List<ExecutorInfo> executorInfos = new ArrayList<>();
 +        executorInfos.addAll(whb.get_executors());
 +        // remove SYSTEM_EXECUTOR_ID
 +        executorInfos.remove(SYSTEM_EXECUTOR_INFO);
 +        List<ExecutorInfo> localExecuorInfos = 
localAssignment.get_executors();
 +
 +        if (localExecuorInfos.size() != executorInfos.size())
 +            return false;
 +
 +        for (ExecutorInfo executorInfo : localExecuorInfos){
 +            if (!localExecuorInfos.contains(executorInfo))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * launch a worker in local mode.
 +     */
 +    protected void launchWorker(SupervisorData supervisorData, String 
stormId, Long port, String workerId, WorkerResources resources) throws 
IOException {
 +        // port this function after porting worker to java
 +    }
 +
 +    protected String getWorkerClassPath(String stormJar, Map stormConf) {
 +        List<String> topoClasspath = new ArrayList<>();
 +        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
 +
 +        if (object instanceof List) {
 +            topoClasspath.addAll((List<String>) object);
 +        } else if (object instanceof String){
 +            topoClasspath.add((String)object);
 +        }else {
 +            //ignore
 +        }
 +        String classPath = Utils.workerClasspath();
 +        String classAddPath = Utils.addToClasspath(classPath, 
Arrays.asList(stormJar));
 +        return Utils.addToClasspath(classAddPath, topoClasspath);
 +    }
 +
 +    /**
 +     * "Generates runtime childopts by replacing keys with topology-id, 
worker-id, port, mem-onheap"
 +     * 
 +     * @param value
 +     * @param workerId
 +     * @param stormId
 +     * @param port
 +     * @param memOnheap
 +     */
 +    public List<String> substituteChildopts(Object value, String workerId, 
String stormId, Long port, int memOnheap) {
 +        List<String> rets = new ArrayList<>();
 +        if (value instanceof String) {
 +            String string = (String) value;
 +            string = string.replace("%ID%", String.valueOf(port));
 +            string = string.replace("%WORKER-ID%", workerId);
 +            string = string.replace("%TOPOLOGY-ID%", stormId);
 +            string = string.replace("%WORKER-PORT%", String.valueOf(port));
 +            string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +            String[] strings = string.split("\\s+");
 +            rets.addAll(Arrays.asList(strings));
 +        } else if (value instanceof List) {
 +            List<Object> objects = (List<Object>) value;
 +            for (Object object : objects) {
 +                String str = (String)object;
 +                str = str.replace("%ID%", String.valueOf(port));
 +                str = str.replace("%WORKER-ID%", workerId);
 +                str = str.replace("%TOPOLOGY-ID%", stormId);
 +                str = str.replace("%WORKER-PORT%", String.valueOf(port));
 +                str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
 +                rets.add(str);
 +            }
 +        }
 +        return rets;
 +    }
 +
 +
 +
 +    /**
 +     * launch a worker in distributed mode
 +     * supervisorId for testing
 +     * @throws IOException
 +     */
 +    protected void launchWorker(Map conf, String supervisorId, String 
assignmentId, String stormId, Long port, String workerId,
 +            WorkerResources resources, CgroupManager cgroupManager, 
ConcurrentHashSet deadWorkers) throws IOException {
 +
 +        Boolean runWorkerAsUser = 
Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
 +        String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
 +        String stormOptions = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
 +        String stormConfFile = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
 +        String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
 +
 +        String stormLogDir = ConfigUtils.getLogDir();
 +        String stormLogConfDir = (String) 
(conf.get(Config.STORM_LOG4J2_CONF_DIR));
 +
 +        String stormLog4j2ConfDir;
 +        if (StringUtils.isNotBlank(stormLogConfDir)) {
 +            if (Utils.isAbsolutePath(stormLogConfDir)) {
 +                stormLog4j2ConfDir = stormLogConfDir;
 +            } else {
 +                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + 
stormLogConfDir;
 +            }
 +        } else {
 +            stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + 
"log4j2";
 +        }
 +
 +        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +
 +        String jlp = jlp(stormRoot, conf);
 +
 +        String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
 +
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +
 +        String workerClassPath = getWorkerClassPath(stormJar, stormConf);
 +
 +        Object topGcOptsObject = 
stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
 +        List<String> topGcOpts = new ArrayList<>();
 +        if (topGcOptsObject instanceof String) {
 +            topGcOpts.add((String) topGcOptsObject);
 +        } else if (topGcOptsObject instanceof List) {
 +            topGcOpts.addAll((List<String>) topGcOptsObject);
 +        }
 +
 +        int memOnheap = 0;
 +        if (resources.get_mem_on_heap() > 0) {
 +            memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
 +        } else {
 +            //set the default heap memory size for supervisor-test
 +            memOnheap = 
Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
 +        }
 +
 +        int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
 +
 +        int cpu = (int) Math.ceil(resources.get_cpu());
 +
 +        List<String> gcOpts = null;
 +
 +        if (topGcOpts != null) {
 +            gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, 
memOnheap);
 +        } else {
 +            gcOpts = 
substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, 
port, memOnheap);
 +        }
 +
 +        Object topoWorkerLogwriterObject = 
stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
 +        List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
 +        if (topoWorkerLogwriterObject instanceof String) {
 +            topoWorkerLogwriterChildopts.add((String) 
topoWorkerLogwriterObject);
 +        } else if (topoWorkerLogwriterObject instanceof List) {
 +            topoWorkerLogwriterChildopts.addAll((List<String>) 
topoWorkerLogwriterObject);
 +        }
 +
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +
 +        String logfileName = "worker.log";
 +
 +        String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
 +
 +        String loggingSensitivity = (String) 
stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
 +        if (loggingSensitivity == null) {
 +            loggingSensitivity = "S3";
 +        }
 +
 +        List<String> workerChildopts = 
substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, 
memOnheap);
 +
 +        List<String> topWorkerChildopts = 
substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, 
stormId, port, memOnheap);
 +
 +        List<String> workerProfilerChildopts = null;
 +        if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), 
false)) {
 +            workerProfilerChildopts = 
substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, 
stormId, port, memOnheap);
 +        }else {
 +            workerProfilerChildopts = new ArrayList<>();
 +        }
 +
 +        Map<String, String> topEnvironment = new HashMap<String, String>();
 +        Map<String, String> environment = (Map<String, String>) 
stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
 +        if (environment != null) {
 +            topEnvironment.putAll(environment);
 +        }
 +        topEnvironment.put("LD_LIBRARY_PATH", jlp);
 +
 +        String log4jConfigurationFile = null;
 +        if (System.getProperty("os.name").startsWith("Windows") && 
!stormLog4j2ConfDir.startsWith("file:")) {
 +            log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
 +        } else {
 +            log4jConfigurationFile = stormLog4j2ConfDir;
 +        }
 +        log4jConfigurationFile = log4jConfigurationFile + 
Utils.FILE_PATH_SEPARATOR + "worker.xml";
 +
 +        List<String> commandList = new ArrayList<>();
 +        commandList.add(SupervisorUtils.javaCmd("java"));
 +        commandList.add("-cp");
 +        commandList.add(workerClassPath);
 +        commandList.addAll(topoWorkerLogwriterChildopts);
 +        commandList.add("-Dlogfile.name=" + logfileName);
 +        commandList.add("-Dstorm.home=" + stormHome);
 +        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +        commandList.add("-Dstorm.id=" + stormId);
 +        commandList.add("-Dworker.id=" + workerId);
 +        commandList.add("-Dworker.port=" + port);
 +        commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +        commandList.add("-Dlog4j.configurationFile=" + 
log4jConfigurationFile);
 +        
commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +        commandList.add("org.apache.storm.LogWriter");
 +
 +        commandList.add(SupervisorUtils.javaCmd("java"));
 +        commandList.add("-server");
 +        commandList.addAll(workerChildopts);
 +        commandList.addAll(topWorkerChildopts);
 +        commandList.addAll(gcOpts);
 +        commandList.addAll(workerProfilerChildopts);
 +        commandList.add("-Djava.library.path=" + jlp);
 +        commandList.add("-Dlogfile.name=" + logfileName);
 +        commandList.add("-Dstorm.home=" + stormHome);
 +        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
 +        commandList.add("-Dstorm.conf.file=" + stormConfFile);
 +        commandList.add("-Dstorm.options=" + stormOptions);
 +        commandList.add("-Dstorm.log.dir=" + stormLogDir);
 +        commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
 +        commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
 +        commandList.add("-Dlog4j.configurationFile=" + 
log4jConfigurationFile);
 +        
commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
 +        commandList.add("-Dstorm.id=" + stormId);
 +        commandList.add("-Dworker.id=" + workerId);
 +        commandList.add("-Dworker.port=" + port);
 +        commandList.add("-cp");
 +        commandList.add(workerClassPath);
 +        commandList.add("org.apache.storm.daemon.worker");
 +        commandList.add(stormId);
 +        commandList.add(assignmentId);
 +        commandList.add(String.valueOf(port));
 +        commandList.add(workerId);
 +
 +        // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil 
(conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
 +        if 
(Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), 
false)) {
 +            int cgRoupMem = (int) (Math.ceil((double) 
conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
 +            int memoryValue = memoffheap + memOnheap + cgRoupMem;
 +            int cpuValue = cpu;
 +            Map<String, Number> map = new HashMap<>();
 +            map.put("cpu", cpuValue);
 +            map.put("memory", memoryValue);
 +            cgroupManager.reserveResourcesForWorker(workerId, map);
 +            commandList = cgroupManager.getLaunchCommand(workerId, 
commandList);
 +        }
 +
 +        LOG.info("Launching worker with command: {}. ", 
Utils.shellCmd(commandList));
 +        writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
 +        ConfigUtils.setWorkerUserWSE(conf, workerId, user);
 +        createArtifactsLink(conf, stormId, port, workerId);
 +
 +        String logPrefix = "Worker Process " + workerId;
 +        String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +
 +        if (deadWorkers != null)
 +            deadWorkers.remove(workerId);
 +        createBlobstoreLinks(conf, stormId, workerId);
 +
 +        ProcessExitCallback processExitCallback = new 
ProcessExitCallback(logPrefix, workerId);
 +        if (runWorkerAsUser) {
 +            List<String> args = new ArrayList<>();
 +            args.add("worker");
 +            args.add(workerDir);
 +            args.add(Utils.writeScript(workerDir, commandList, 
topEnvironment));
 +            SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, 
processExitCallback, new File(workerDir));
 +        } else {
 +            Utils.launchProcess(commandList, topEnvironment, logPrefix, 
processExitCallback, new File(workerDir));
 +        }
 +    }
 +
 +    protected String jlp(String stormRoot, Map conf) {
 +        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + 
ConfigUtils.RESOURCES_SUBDIR;
 +        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
 +        String arch = System.getProperty("os.arch");
 +        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + 
os + "-" + arch;
 +        String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + 
resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
 +        return ret;
 +    }
 +
 +    protected Map<String, Integer> startNewWorkers(Map<Integer, String> 
newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws 
IOException {
 +
 +        Map<String, Integer> newValidWorkerIds = new HashMap<>();
 +        Map conf = supervisorData.getConf();
 +        String supervisorId = supervisorData.getSupervisorId();
 +        String clusterMode = ConfigUtils.clusterMode(conf);
 +
 +        for (Map.Entry<Integer, LocalAssignment> entry : 
reassignExecutors.entrySet()) {
 +            Integer port = entry.getKey();
 +            LocalAssignment assignment = entry.getValue();
 +            String workerId = newWorkerIds.get(port);
 +            String stormId = assignment.get_topology_id();
 +            WorkerResources resources = assignment.get_resources();
 +
 +            // This condition checks for required files exist before 
launching the worker
 +            if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
 +                String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
 +                String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, 
workerId);
 +
 +                LOG.info("Launching worker with assignment {} for this 
supervisor {} on port {} with id {}", assignment, 
supervisorData.getSupervisorId(), port,
 +                        workerId);
 +
 +                FileUtils.forceMkdir(new File(pidsPath));
 +                FileUtils.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, 
workerId)));
 +                FileUtils.forceMkdir(new File(hbPath));
 +
 +                if (clusterMode.endsWith("distributed")) {
 +                    launchWorker(conf, supervisorId, 
supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, 
resources,
 +                            supervisorData.getResourceIsolationManager(), 
supervisorData.getDeadWorkers());
 +                } else if (clusterMode.endsWith("local")) {
 +                    launchWorker(supervisorData, stormId, port.longValue(), 
workerId, resources);
 +                }
 +                newValidWorkerIds.put(workerId, port);
 +
 +            } else {
 +                LOG.info("Missing topology storm code, so can't launch worker 
with assignment {} for this supervisor {} on port {} with id {}", assignment,
 +                        supervisorData.getSupervisorId(), port, workerId);
 +            }
 +
 +        }
 +        return newValidWorkerIds;
 +    }
 +
 +    public void writeLogMetadata(Map stormconf, String user, String workerId, 
String stormId, Long port, Map conf) throws IOException {
 +        Map data = new HashMap();
 +        data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
 +        data.put("worker-id", workerId);
 +
 +        Set<String> logsGroups = new HashSet<>();
 +        //for supervisor-test
 +        if (stormconf.get(Config.LOGS_GROUPS) != null) {
 +            List<String> groups = (List<String>) 
stormconf.get(Config.LOGS_GROUPS);
 +            for (String group : groups){
 +                logsGroups.add(group);
 +            }
 +        }
 +        if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
 +            List<String> topGroups = (List<String>) 
stormconf.get(Config.TOPOLOGY_GROUPS);
 +            for (String group : topGroups){
 +                logsGroups.add(group);
 +            }
 +        }
 +        data.put(Config.LOGS_GROUPS, logsGroups.toArray());
 +
 +        Set<String> logsUsers = new HashSet<>();
 +        if (stormconf.get(Config.LOGS_USERS) != null) {
 +            List<String> logUsers = (List<String>) 
stormconf.get(Config.LOGS_USERS);
 +            for (String logUser : logUsers){
 +                logsUsers.add(logUser);
 +            }
 +        }
 +        if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
 +            List<String> topUsers = (List<String>) 
stormconf.get(Config.TOPOLOGY_USERS);
 +            for (String logUser : topUsers){
 +                logsUsers.add(logUser);
 +            }
 +        }
 +        data.put(Config.LOGS_USERS, logsUsers.toArray());
 +        writeLogMetadataToYamlFile(stormId, port, data, conf);
 +    }
 +
 +    /**
 +     * run worker as user needs the directory to have special permissions or 
it is insecure
 +     * 
 +     * @param stormId
 +     * @param port
 +     * @param data
 +     * @param conf
 +     * @throws IOException
 +     */
 +    public void writeLogMetadataToYamlFile(String stormId, Long port, Map 
data, Map conf) throws IOException {
 +        File file = ConfigUtils.getLogMetaDataFile(conf, stormId, 
port.intValue());
 +
 +        if (!Utils.checkFileExists(file.getParent())) {
 +            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                FileUtils.forceMkdir(file.getParentFile());
 +                SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), 
file.getParentFile().getCanonicalPath());
 +            } else {
 +                file.getParentFile().mkdirs();
 +            }
 +        }
 +        FileWriter writer = new FileWriter(file);
 +        Yaml yaml = new Yaml();
 +        try {
 +            yaml.dump(data, writer);
 +        }finally {
 +            writer.close();
 +        }
 +
 +    }
 +
 +    /**
 +     * Create a symlink from workder directory to its port artifacts directory
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param port
 +     * @param workerId
 +     */
 +    protected void createArtifactsLink(Map conf, String stormId, Long port, 
String workerId) throws IOException {
 +        String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +        String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
 +        if (Utils.checkFileExists(workerDir)) {
 +            Utils.createSymlink(workerDir, topoDir, "artifacts", 
String.valueOf(port));
 +        }
 +    }
 +
 +    /**
 +     * Create symlinks in worker launch directory for all blobs
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param workerId
 +     * @throws IOException
 +     */
 +    protected void createBlobstoreLinks(Map conf, String stormId, String 
workerId) throws IOException {
 +        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        String workerRoot = ConfigUtils.workerRoot(conf, workerId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        List<String> blobFileNames = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
 +                String key = entry.getKey();
 +                Map<String, Object> blobInfo = entry.getValue();
 +                String ret = null;
 +                if (blobInfo != null && blobInfo.containsKey("localname")) {
 +                    ret = (String) blobInfo.get("localname");
 +                } else {
 +                    ret = key;
 +                }
 +                blobFileNames.add(ret);
 +            }
 +        }
 +        List<String> resourceFileNames = new ArrayList<>();
 +        resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
 +        resourceFileNames.addAll(blobFileNames);
 +        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames);
 +        Utils.createSymlink(workerRoot, stormRoot, 
ConfigUtils.RESOURCES_SUBDIR);
 +        for (String fileName : blobFileNames) {
 +            Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
 +        }
 +    }
 +
 +    //for supervisor-test
 +    public void shutWorker(SupervisorData supervisorData, String workerId) 
throws IOException, InterruptedException{
 +        SupervisorUtils.shutWorker(supervisorData, workerId);
 +    }
 +}

Reply via email to