Merge branch 'master' into supervisor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d46ed8fb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d46ed8fb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d46ed8fb Branch: refs/heads/master Commit: d46ed8fbe6bbd8116ceb60769f4b0b569bfd2d0d Parents: 7d0551d 3812b2f Author: xiaojian.fxj <[email protected]> Authored: Wed Mar 30 17:06:30 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Wed Mar 30 17:06:30 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 9 + bin/storm.py | 2 +- docs/Documentation.md | 50 -- docs/Kestrel-and-Storm.md | 2 +- docs/README.md | 64 +- docs/Resource_Aware_Scheduler_overview.md | 2 + docs/Trident-RAS-API.md | 52 ++ docs/index.md | 18 +- docs/storm-cassandra.md | 255 +++++++ docs/storm-elasticsearch.md | 105 +++ docs/storm-mongodb.md | 199 +++++ docs/storm-mqtt.md | 379 ++++++++++ docs/windows-users-guide.md | 21 + examples/storm-starter/pom.xml | 13 +- .../TridentHBaseWindowingStoreTopology.java | 93 +++ .../TridentWindowingInmemoryStoreTopology.java | 98 +++ .../main/java/org/apache/storm/flux/Flux.java | 22 +- .../storm/hbase/bolt/AbstractHBaseBolt.java | 1 - .../org/apache/storm/hbase/bolt/HBaseBolt.java | 18 +- .../trident/windowing/HBaseWindowsStore.java | 273 +++++++ .../windowing/HBaseWindowsStoreFactory.java | 55 ++ .../org/apache/storm/hive/bolt/HiveBolt.java | 147 ++-- .../apache/storm/hive/common/HiveOptions.java | 8 +- .../org/apache/storm/hive/common/HiveUtils.java | 11 +- .../apache/storm/hive/common/HiveWriter.java | 127 ++-- .../apache/storm/hive/trident/HiveState.java | 38 +- .../storm/hive/trident/HiveStateFactory.java | 1 + .../apache/storm/hive/trident/HiveUpdater.java | 1 + .../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++++ .../apache/storm/hive/bolt/HiveTopology.java | 6 +- .../apache/storm/hive/bolt/TestHiveBolt.java | 11 +- .../storm/hive/common/TestHiveWriter.java | 13 +- .../storm/hive/trident/TridentHiveTopology.java | 2 +- pom.xml | 6 +- storm-core/pom.xml | 2 +- .../apache/storm/daemon/local_supervisor.clj | 4 +- .../org/apache/storm/pacemaker/pacemaker.clj | 242 ------ storm-core/src/jvm/org/apache/storm/Config.java | 10 +- .../cluster/PaceMakerStateStorageFactory.java | 3 +- .../storm/daemon/supervisor/StateHeartbeat.java | 2 +- .../storm/daemon/supervisor/Supervisor.java | 5 +- .../storm/metric/StormMetricsRegistry.java | 12 +- .../org/apache/storm/pacemaker/Pacemaker.java | 217 ++++++ .../jvm/org/apache/storm/scheduler/Cluster.java | 23 +- .../org/apache/storm/scheduler/Topologies.java | 11 +- .../scheduler/resource/ClusterStateData.java | 101 --- .../resource/ResourceAwareScheduler.java | 138 ++-- .../scheduler/resource/SchedulingState.java | 56 ++ .../apache/storm/scheduler/resource/User.java | 24 +- .../eviction/DefaultEvictionStrategy.java | 10 +- .../strategies/eviction/IEvictionStrategy.java | 9 +- .../DefaultSchedulingPriorityStrategy.java | 9 +- .../priority/ISchedulingPriorityStrategy.java | 9 +- .../DefaultResourceAwareStrategy.java | 86 +-- .../strategies/scheduling/IStrategy.java | 6 +- .../serialization/BlowfishTupleSerializer.java | 24 +- .../jvm/org/apache/storm/trident/Stream.java | 207 +++++- .../apache/storm/trident/TridentTopology.java | 4 + .../storm/trident/fluent/UniqueIdGen.java | 14 +- .../storm/trident/operation/builtin/Debug.java | 4 +- .../windowing/AbstractTridentWindowManager.java | 238 ++++++ .../windowing/ITridentWindowManager.java | 59 ++ .../windowing/InMemoryTridentWindowManager.java | 72 ++ .../trident/windowing/InMemoryWindowsStore.java | 200 +++++ .../windowing/InMemoryWindowsStoreFactory.java | 46 ++ .../StoreBasedTridentWindowManager.java | 217 ++++++ .../trident/windowing/TridentBatchTuple.java | 42 ++ .../windowing/WindowTridentProcessor.java | 265 +++++++ .../storm/trident/windowing/WindowsState.java | 52 ++ .../trident/windowing/WindowsStateFactory.java | 40 + .../trident/windowing/WindowsStateUpdater.java | 81 ++ .../storm/trident/windowing/WindowsStore.java | 78 ++ .../trident/windowing/WindowsStoreFactory.java | 35 + .../windowing/config/BaseWindowConfig.java | 48 ++ .../windowing/config/SlidingCountWindow.java | 43 ++ .../windowing/config/SlidingDurationWindow.java | 44 ++ .../windowing/config/TumblingCountWindow.java | 43 ++ .../config/TumblingDurationWindow.java | 42 ++ .../trident/windowing/config/WindowConfig.java | 57 ++ .../windowing/strategy/BaseWindowStrategy.java | 32 + .../strategy/SlidingCountWindowStrategy.java | 59 ++ .../strategy/SlidingDurationWindowStrategy.java | 60 ++ .../strategy/TumblingCountWindowStrategy.java | 60 ++ .../TumblingDurationWindowStrategy.java | 60 ++ .../windowing/strategy/WindowStrategy.java | 45 ++ .../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +- .../apache/storm/windowing/TriggerHandler.java | 2 +- .../storm/pacemaker_state_factory_test.clj | 151 ---- .../clj/org/apache/storm/pacemaker_test.clj | 242 ------ .../scheduler/resource_aware_scheduler_test.clj | 738 ------------------- .../storm/PaceMakerStateStorageFactoryTest.java | 136 ++++ .../jvm/org/apache/storm/PacemakerTest.java | 242 ++++++ .../resource/TestResourceAwareScheduler.java | 725 +++++++++++++++++- .../TestUtilsForResourceAwareScheduler.java | 43 +- .../storm/trident/TridentWindowingTest.java | 105 +++ 95 files changed, 5981 insertions(+), 1965 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/bin/storm.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj index ba3c87e,0000000..560ae3e mode 100644,000000..100644 --- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj @@@ -1,64 -1,0 +1,64 @@@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] + [org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) + +(defn launch-local-worker [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + pid (Utils/uuid) + worker (worker/mk-worker conf + (.getSharedContext supervisorData) + stormId + (.getAssignmentId supervisorData) + (int port) + workerId)] + (ConfigUtils/setWorkerUserWSE conf workerId "") + (ProcessSimulator/registerProcess pid worker) - (.put (.getWorkerThreadPids supervisorData) workerId pid) - )) ++ (.put (.getWorkerThreadPids supervisorData) workerId pid))) ++ +(defn shutdown-local-worker [supervisorData worker-manager workerId] + (log-message "shutdown-local-worker") + (let [supervisor-id (.getSupervisorId supervisorData) + worker-pids (.getWorkerThreadPids supervisorData) + dead-workers (.getDeadWorkers supervisorData)] + (.shutdownWorker worker-manager supervisor-id workerId worker-pids) + (if (.cleanupWorker worker-manager workerId) + (.remove dead-workers workerId)))) + +(defn local-process [] + "Create a local process event" + (proxy [SyncProcessEvent] [] + (launchLocalWorker [supervisorData stormId port workerId resources] + (launch-local-worker supervisorData stormId port workerId resources)) + (killWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId)))) + + +(defserverfn mk-local-supervisor [conf shared-context isupervisor] + (log-message "Starting local Supervisor with conf " conf) + (if (not (ConfigUtils/isLocalMode conf)) + (throw + (IllegalArgumentException. "Cannot start server in distrubuted mode!"))) + (let [local-process (local-process) + supervisor-server (Supervisor.)] + (.setLocalSyncProcess supervisor-server local-process) + (.mkSupervisor supervisor-server conf shared-context isupervisor))) http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java index cca3fa2,0000000..f4f40a1 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java @@@ -1,45 -1,0 +1,45 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.storm.generated.LSWorkerHeartbeat; + +public class StateHeartbeat { + private State state; - private LSWorkerHeartbeat hb; ++ private final LSWorkerHeartbeat hb; + + public StateHeartbeat(State state, LSWorkerHeartbeat hb) { + this.state = state; + this.hb = hb; + } + + public State getState() { + return this.state; + } + + public LSWorkerHeartbeat getHeartbeat() { + return this.hb; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java index 2b5078b,0000000..a3ad488 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java @@@ -1,178 -1,0 +1,177 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class Supervisor { - private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); - - // TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor ++ private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class); ++ + private SyncProcessEvent localSyncProcess; + + public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) { + this.localSyncProcess = localSyncProcess; + } + + /** + * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary + * + * @param conf + * @param sharedContext + * @param iSupervisor + * @return + * @throws Exception + */ + public SupervisorManager mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { + SupervisorManager supervisorManager = null; + try { + LOG.info("Starting Supervisor with conf {}", conf); + iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); + String path = ConfigUtils.supervisorTmpDir(conf); + FileUtils.cleanDirectory(new File(path)); + + final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); + Localizer localizer = supervisorData.getLocalizer(); + + SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); + hb.run(); + // should synchronize supervisor so it doesn't launch anything after being down (optimization) + Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); + supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); + + Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); + for (String stormId : downloadedStormIds) { + SupervisorUtils.addBlobReferences(localizer, stormId, conf); + } + // do this after adding the references so we don't try to clean things being used + localizer.startCleaner(); + + EventManagerImp syncSupEventManager = new EventManagerImp(false); + EventManagerImp syncProcessManager = new EventManagerImp(false); + + SyncProcessEvent syncProcessEvent = null; + if (ConfigUtils.isLocalMode(conf)) { + localSyncProcess.init(supervisorData); + syncProcessEvent = localSyncProcess; + } else { + syncProcessEvent = new SyncProcessEvent(supervisorData); + } + + SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager); + UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData); + RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData); + + if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) { + StormTimer eventTimer = supervisorData.getEventTimer(); + // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up + // to date even if callbacks don't all work exactly right + eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager)); + + eventTimer.scheduleRecurring(0, Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)), + new EventManagerPushCallback(syncProcessEvent, syncProcessManager)); + + // Blob update thread. Starts with 30 seconds delay, every 30 seconds + supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager)); + + // supervisor health check + eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData)); + + // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds + eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager)); + } + LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName()); + supervisorManager = new SupervisorManager(supervisorData, syncSupEventManager, syncProcessManager); + } catch (Throwable t) { + if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) { + throw t; + } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) { + throw t; + } else { + LOG.error("Error on initialization of server supervisor: {}", t); + Utils.exitProcess(13, "Error on initialization"); + } + } + return supervisorManager; + } + + /** + * start distribute supervisor + */ + private void launch(ISupervisor iSupervisor) { + LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); + SupervisorManager supervisorManager; + try { + Map<Object, Object> conf = Utils.readStormConfig(); + if (ConfigUtils.isLocalMode(conf)) { + throw new IllegalArgumentException("Cannot start server in local mode!"); + } + supervisorManager = mkSupervisor(conf, null, iSupervisor); + if (supervisorManager != null) + Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager); + registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf); + StormMetricsRegistry.startMetricsReporters(conf); + } catch (Exception e) { + LOG.error("Failed to start supervisor\n", e); + System.exit(1); + } + } + + private void registerWorkerNumGauge(String name, final Map conf) { + StormMetricsRegistry.registerGauge(name, new Callable<Integer>() { + @Override + public Integer call() throws Exception { + Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf); + return pids.size(); + } + }); + } + + /** + * supervisor daemon enter entrance + * + * @param args + */ + public static void main(String[] args) { + Utils.setupDefaultUncaughtExceptionHandler(); + Supervisor instance = new Supervisor(); + instance.launch(new StandaloneSupervisor()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d46ed8fb/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java ----------------------------------------------------------------------
