Merge branch 'master' into supervisor

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d46ed8fb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d46ed8fb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d46ed8fb

Branch: refs/heads/master
Commit: d46ed8fbe6bbd8116ceb60769f4b0b569bfd2d0d
Parents: 7d0551d 3812b2f
Author: xiaojian.fxj <[email protected]>
Authored: Wed Mar 30 17:06:30 2016 +0800
Committer: xiaojian.fxj <[email protected]>
Committed: Wed Mar 30 17:06:30 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   9 +
 bin/storm.py                                    |   2 +-
 docs/Documentation.md                           |  50 --
 docs/Kestrel-and-Storm.md                       |   2 +-
 docs/README.md                                  |  64 +-
 docs/Resource_Aware_Scheduler_overview.md       |   2 +
 docs/Trident-RAS-API.md                         |  52 ++
 docs/index.md                                   |  18 +-
 docs/storm-cassandra.md                         | 255 +++++++
 docs/storm-elasticsearch.md                     | 105 +++
 docs/storm-mongodb.md                           | 199 +++++
 docs/storm-mqtt.md                              | 379 ++++++++++
 docs/windows-users-guide.md                     |  21 +
 examples/storm-starter/pom.xml                  |  13 +-
 .../TridentHBaseWindowingStoreTopology.java     |  93 +++
 .../TridentWindowingInmemoryStoreTopology.java  |  98 +++
 .../main/java/org/apache/storm/flux/Flux.java   |  22 +-
 .../storm/hbase/bolt/AbstractHBaseBolt.java     |   1 -
 .../org/apache/storm/hbase/bolt/HBaseBolt.java  |  18 +-
 .../trident/windowing/HBaseWindowsStore.java    | 273 +++++++
 .../windowing/HBaseWindowsStoreFactory.java     |  55 ++
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 147 ++--
 .../apache/storm/hive/common/HiveOptions.java   |   8 +-
 .../org/apache/storm/hive/common/HiveUtils.java |  11 +-
 .../apache/storm/hive/common/HiveWriter.java    | 127 ++--
 .../apache/storm/hive/trident/HiveState.java    |  38 +-
 .../storm/hive/trident/HiveStateFactory.java    |   1 +
 .../apache/storm/hive/trident/HiveUpdater.java  |   1 +
 .../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++++
 .../apache/storm/hive/bolt/HiveTopology.java    |   6 +-
 .../apache/storm/hive/bolt/TestHiveBolt.java    |  11 +-
 .../storm/hive/common/TestHiveWriter.java       |  13 +-
 .../storm/hive/trident/TridentHiveTopology.java |   2 +-
 pom.xml                                         |   6 +-
 storm-core/pom.xml                              |   2 +-
 .../apache/storm/daemon/local_supervisor.clj    |   4 +-
 .../org/apache/storm/pacemaker/pacemaker.clj    | 242 ------
 storm-core/src/jvm/org/apache/storm/Config.java |  10 +-
 .../cluster/PaceMakerStateStorageFactory.java   |   3 +-
 .../storm/daemon/supervisor/StateHeartbeat.java |   2 +-
 .../storm/daemon/supervisor/Supervisor.java     |   5 +-
 .../storm/metric/StormMetricsRegistry.java      |  12 +-
 .../org/apache/storm/pacemaker/Pacemaker.java   | 217 ++++++
 .../jvm/org/apache/storm/scheduler/Cluster.java |  23 +-
 .../org/apache/storm/scheduler/Topologies.java  |  11 +-
 .../scheduler/resource/ClusterStateData.java    | 101 ---
 .../resource/ResourceAwareScheduler.java        | 138 ++--
 .../scheduler/resource/SchedulingState.java     |  56 ++
 .../apache/storm/scheduler/resource/User.java   |  24 +-
 .../eviction/DefaultEvictionStrategy.java       |  10 +-
 .../strategies/eviction/IEvictionStrategy.java  |   9 +-
 .../DefaultSchedulingPriorityStrategy.java      |   9 +-
 .../priority/ISchedulingPriorityStrategy.java   |   9 +-
 .../DefaultResourceAwareStrategy.java           |  86 +--
 .../strategies/scheduling/IStrategy.java        |   6 +-
 .../serialization/BlowfishTupleSerializer.java  |  24 +-
 .../jvm/org/apache/storm/trident/Stream.java    | 207 +++++-
 .../apache/storm/trident/TridentTopology.java   |   4 +
 .../storm/trident/fluent/UniqueIdGen.java       |  14 +-
 .../storm/trident/operation/builtin/Debug.java  |   4 +-
 .../windowing/AbstractTridentWindowManager.java | 238 ++++++
 .../windowing/ITridentWindowManager.java        |  59 ++
 .../windowing/InMemoryTridentWindowManager.java |  72 ++
 .../trident/windowing/InMemoryWindowsStore.java | 200 +++++
 .../windowing/InMemoryWindowsStoreFactory.java  |  46 ++
 .../StoreBasedTridentWindowManager.java         | 217 ++++++
 .../trident/windowing/TridentBatchTuple.java    |  42 ++
 .../windowing/WindowTridentProcessor.java       | 265 +++++++
 .../storm/trident/windowing/WindowsState.java   |  52 ++
 .../trident/windowing/WindowsStateFactory.java  |  40 +
 .../trident/windowing/WindowsStateUpdater.java  |  81 ++
 .../storm/trident/windowing/WindowsStore.java   |  78 ++
 .../trident/windowing/WindowsStoreFactory.java  |  35 +
 .../windowing/config/BaseWindowConfig.java      |  48 ++
 .../windowing/config/SlidingCountWindow.java    |  43 ++
 .../windowing/config/SlidingDurationWindow.java |  44 ++
 .../windowing/config/TumblingCountWindow.java   |  43 ++
 .../config/TumblingDurationWindow.java          |  42 ++
 .../trident/windowing/config/WindowConfig.java  |  57 ++
 .../windowing/strategy/BaseWindowStrategy.java  |  32 +
 .../strategy/SlidingCountWindowStrategy.java    |  59 ++
 .../strategy/SlidingDurationWindowStrategy.java |  60 ++
 .../strategy/TumblingCountWindowStrategy.java   |  60 ++
 .../TumblingDurationWindowStrategy.java         |  60 ++
 .../windowing/strategy/WindowStrategy.java      |  45 ++
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../apache/storm/windowing/TriggerHandler.java  |   2 +-
 .../storm/pacemaker_state_factory_test.clj      | 151 ----
 .../clj/org/apache/storm/pacemaker_test.clj     | 242 ------
 .../scheduler/resource_aware_scheduler_test.clj | 738 -------------------
 .../storm/PaceMakerStateStorageFactoryTest.java | 136 ++++
 .../jvm/org/apache/storm/PacemakerTest.java     | 242 ++++++
 .../resource/TestResourceAwareScheduler.java    | 725 +++++++++++++++++-
 .../TestUtilsForResourceAwareScheduler.java     |  43 +-
 .../storm/trident/TridentWindowingTest.java     | 105 +++
 95 files changed, 5981 insertions(+), 1965 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/bin/storm.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index ba3c87e,0000000..560ae3e
mode 100644,000000..100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@@ -1,64 -1,0 +1,64 @@@
 +;; Licensed to the Apache Software Foundation (ASF) under one
 +;; or more contributor license agreements.  See the NOTICE file
 +;; distributed with this work for additional information
 +;; regarding copyright ownership.  The ASF licenses this file
 +;; to you under the Apache License, Version 2.0 (the
 +;; "License"); you may not use this file except in compliance
 +;; with the License.  You may obtain a copy of the License at
 +;;
 +;; http://www.apache.org/licenses/LICENSE-2.0
 +;;
 +;; Unless required by applicable law or agreed to in writing, software
 +;; distributed under the License is distributed on an "AS IS" BASIS,
 +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +;; See the License for the specific language governing permissions and
 +;; limitations under the License.
 +(ns org.apache.storm.daemon.local-supervisor
 +  (:import [org.apache.storm.daemon.supervisor SyncProcessEvent 
SupervisorData Supervisor SupervisorUtils]
 +           [org.apache.storm.utils Utils ConfigUtils]
 +           [org.apache.storm ProcessSimulator])
 +  (:use [org.apache.storm.daemon common]
 +        [org.apache.storm log])
 +  (:require [org.apache.storm.daemon [worker :as worker] ])
 +  (:require [clojure.string :as str])
 +  (:gen-class))
 +
 +(defn launch-local-worker [supervisorData stormId port workerId resources]
 +  (let [conf (.getConf supervisorData)
 +         pid (Utils/uuid)
 +        worker (worker/mk-worker conf
 +                 (.getSharedContext supervisorData)
 +                 stormId
 +                 (.getAssignmentId supervisorData)
 +                 (int port)
 +                 workerId)]
 +    (ConfigUtils/setWorkerUserWSE conf workerId "")
 +    (ProcessSimulator/registerProcess pid worker)
-     (.put (.getWorkerThreadPids supervisorData) workerId pid)
-     ))
++    (.put (.getWorkerThreadPids supervisorData) workerId pid)))
++
 +(defn shutdown-local-worker [supervisorData worker-manager workerId]
 +  (log-message "shutdown-local-worker")
 +  (let [supervisor-id (.getSupervisorId supervisorData)
 +        worker-pids (.getWorkerThreadPids supervisorData)
 +        dead-workers (.getDeadWorkers supervisorData)]
 +    (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
 +    (if (.cleanupWorker worker-manager workerId)
 +      (.remove dead-workers workerId))))
 +
 +(defn local-process []
 +  "Create a local process event"
 +  (proxy [SyncProcessEvent] []
 +    (launchLocalWorker [supervisorData stormId port workerId resources]
 +      (launch-local-worker supervisorData stormId port workerId resources))
 +    (killWorker [supervisorData worker-manager workerId] 
(shutdown-local-worker supervisorData worker-manager workerId))))
 +
 +
 +(defserverfn mk-local-supervisor [conf shared-context isupervisor]
 +  (log-message "Starting local Supervisor with conf " conf)
 +  (if (not (ConfigUtils/isLocalMode conf))
 +    (throw
 +      (IllegalArgumentException. "Cannot start server in distrubuted mode!")))
 +  (let [local-process (local-process)
 +        supervisor-server (Supervisor.)]
 +    (.setLocalSyncProcess supervisor-server local-process)
 +    (.mkSupervisor supervisor-server conf shared-context isupervisor)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
index cca3fa2,0000000..f4f40a1
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
@@@ -1,45 -1,0 +1,45 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.lang.builder.ToStringBuilder;
 +import org.apache.commons.lang.builder.ToStringStyle;
 +import org.apache.storm.generated.LSWorkerHeartbeat;
 +
 +public class StateHeartbeat {
 +    private State state;
-     private LSWorkerHeartbeat hb;
++    private final LSWorkerHeartbeat hb;
 +
 +    public StateHeartbeat(State state, LSWorkerHeartbeat hb) {
 +        this.state = state;
 +        this.hb = hb;
 +    }
 +
 +    public State getState() {
 +        return this.state;
 +    }
 +
 +    public LSWorkerHeartbeat getHeartbeat() {
 +        return this.hb;
 +    }
 +
 +    @Override
 +    public String toString() {
 +        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.SHORT_PREFIX_STYLE);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 2b5078b,0000000..a3ad488
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@@ -1,178 -1,0 +1,177 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.StormTimer;
 +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
 +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
 +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
 +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
 +import org.apache.storm.event.EventManagerImp;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.messaging.IContext;
 +import org.apache.storm.metric.StormMetricsRegistry;
 +import org.apache.storm.scheduler.ISupervisor;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Utils;
 +import org.apache.storm.utils.VersionInfo;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.InterruptedIOException;
 +import java.util.Collection;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +
 +public class Supervisor {
-     private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
- 
-     // TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
++    private static final Logger LOG = 
LoggerFactory.getLogger(Supervisor.class);
++    
 +    private SyncProcessEvent localSyncProcess;
 +
 +    public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
 +        this.localSyncProcess = localSyncProcess;
 +    }
 +
 +    /**
 +     * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
 +     * 
 +     * @param conf
 +     * @param sharedContext
 +     * @param iSupervisor
 +     * @return
 +     * @throws Exception
 +     */
 +    public SupervisorManager mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
 +        SupervisorManager supervisorManager = null;
 +        try {
 +            LOG.info("Starting Supervisor with conf {}", conf);
 +            iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
 +            String path = ConfigUtils.supervisorTmpDir(conf);
 +            FileUtils.cleanDirectory(new File(path));
 +
 +            final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
 +            Localizer localizer = supervisorData.getLocalizer();
 +
 +            SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
 +            hb.run();
 +            // should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
 +            Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
 +            supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
 +
 +            Set<String> downloadedStormIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
 +            for (String stormId : downloadedStormIds) {
 +                SupervisorUtils.addBlobReferences(localizer, stormId, conf);
 +            }
 +            // do this after adding the references so we don't try to clean 
things being used
 +            localizer.startCleaner();
 +
 +            EventManagerImp syncSupEventManager = new EventManagerImp(false);
 +            EventManagerImp syncProcessManager = new EventManagerImp(false);
 +
 +            SyncProcessEvent syncProcessEvent = null;
 +            if (ConfigUtils.isLocalMode(conf)) {
 +                localSyncProcess.init(supervisorData);
 +                syncProcessEvent = localSyncProcess;
 +            } else {
 +                syncProcessEvent = new SyncProcessEvent(supervisorData);
 +            }
 +
 +            SyncSupervisorEvent syncSupervisorEvent = new 
SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, 
syncProcessManager);
 +            UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
 +            RunProfilerActions runProfilerActionThread = new 
RunProfilerActions(supervisorData);
 +
 +            if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
 +                StormTimer eventTimer = supervisorData.getEventTimer();
 +                // This isn't strictly necessary, but it doesn't hurt and 
ensures that the machine stays up
 +                // to date even if callbacks don't all work exactly right
 +                eventTimer.scheduleRecurring(0, 10, new 
EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
 +
 +                eventTimer.scheduleRecurring(0, 
Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)),
 +                        new EventManagerPushCallback(syncProcessEvent, 
syncProcessManager));
 +
 +                // Blob update thread. Starts with 30 seconds delay, every 30 
seconds
 +                supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, 
new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
 +
 +                // supervisor health check
 +                eventTimer.scheduleRecurring(300, 300, new 
SupervisorHealthCheck(supervisorData));
 +
 +                // Launch a thread that Runs profiler commands . Starts with 
30 seconds delay, every 30 seconds
 +                eventTimer.scheduleRecurring(30, 30, new 
EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
 +            }
 +            LOG.info("Starting supervisor with id {} at host {}.", 
supervisorData.getSupervisorId(), supervisorData.getHostName());
 +            supervisorManager = new SupervisorManager(supervisorData, 
syncSupEventManager, syncProcessManager);
 +        } catch (Throwable t) {
 +            if 
(Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
 +                throw t;
 +            } else if 
(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
 +                throw t;
 +            } else {
 +                LOG.error("Error on initialization of server supervisor: {}", 
t);
 +                Utils.exitProcess(13, "Error on initialization");
 +            }
 +        }
 +        return supervisorManager;
 +    }
 +
 +    /**
 +     * start distribute supervisor
 +     */
 +    private void launch(ISupervisor iSupervisor) {
 +        LOG.info("Starting supervisor for storm version '{}'.", 
VersionInfo.getVersion());
 +        SupervisorManager supervisorManager;
 +        try {
 +            Map<Object, Object> conf = Utils.readStormConfig();
 +            if (ConfigUtils.isLocalMode(conf)) {
 +                throw new IllegalArgumentException("Cannot start server in 
local mode!");
 +            }
 +            supervisorManager = mkSupervisor(conf, null, iSupervisor);
 +            if (supervisorManager != null)
 +                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
 +            registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
 +            StormMetricsRegistry.startMetricsReporters(conf);
 +        } catch (Exception e) {
 +            LOG.error("Failed to start supervisor\n", e);
 +            System.exit(1);
 +        }
 +    }
 +
 +    private void registerWorkerNumGauge(String name, final Map conf) {
 +        StormMetricsRegistry.registerGauge(name, new Callable<Integer>() {
 +            @Override
 +            public Integer call() throws Exception {
 +                Collection<String> pids = 
SupervisorUtils.supervisorWorkerIds(conf);
 +                return pids.size();
 +            }
 +        });
 +    }
 +
 +    /**
 +     * supervisor daemon enter entrance
 +     *
 +     * @param args
 +     */
 +    public static void main(String[] args) {
 +        Utils.setupDefaultUncaughtExceptionHandler();
 +        Supervisor instance = new Supervisor();
 +        instance.launch(new StandaloneSupervisor());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------

Reply via email to