STORM-2084: Refactor localization to combine files together
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78cb243c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78cb243c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78cb243c Branch: refs/heads/master Commit: 78cb243c4bc9aaeaffd6f1c76915ac20016b32e7 Parents: 66ff5fd Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Fri Sep 15 13:29:40 2017 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Thu Sep 21 15:59:24 2017 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 2 +- .../src/jvm/org/apache/storm/StormTimer.java | 2 +- .../org/apache/storm/testing4j_test.clj | 44 +- .../daemon/supervisor/ReadClusterState.java | 4 +- .../apache/storm/daemon/supervisor/Slot.java | 24 +- .../storm/daemon/supervisor/Supervisor.java | 53 +- .../daemon/supervisor/SupervisorUtils.java | 33 - .../daemon/supervisor/timer/UpdateBlobs.java | 111 -- .../org/apache/storm/event/EventManagerImp.java | 2 +- .../apache/storm/localizer/AsyncLocalizer.java | 1030 +++++++++++++++--- .../org/apache/storm/localizer/ILocalizer.java | 70 -- .../localizer/LocalDownloadedResource.java | 42 +- .../org/apache/storm/localizer/Localizer.java | 695 ------------ .../org/apache/storm/utils/ServerUtils.java | 18 +- .../storm/daemon/supervisor/SlotTest.java | 24 +- .../storm/localizer/AsyncLocalizerTest.java | 699 +++++++++++- .../apache/storm/localizer/LocalizerTest.java | 682 ------------ 17 files changed, 1627 insertions(+), 1908 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index c6ef390..679a74b 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -123,7 +123,7 @@ supervisor.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore" supervisor.blobstore.download.thread.count: 5 supervisor.blobstore.download.max_retries: 3 supervisor.localizer.cache.target.size.mb: 10240 -supervisor.localizer.cleanup.interval.ms: 600000 +supervisor.localizer.cleanup.interval.ms: 30000 nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore" nimbus.blobstore.expiration.secs: 600 http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-client/src/jvm/org/apache/storm/StormTimer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java index 4f6a7d5..b2e2b4a 100644 --- a/storm-client/src/jvm/org/apache/storm/StormTimer.java +++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java @@ -225,7 +225,7 @@ public class StormTimer implements AutoCloseable { */ @Override - public void close() throws Exception { + public void close() throws InterruptedException { if (this.task.isActive()) { this.task.setActive(false); this.task.interrupt(); http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj index 87e1fc0..1b12928 100644 --- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj @@ -65,13 +65,13 @@ (reify TestJob (^void run [this ^ILocalCluster cluster] (let [topology (Thrift/buildTopology - {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))} + {"spout" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))} {"2" (Thrift/prepareBoltDetails - {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID) + {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID) (Thrift/prepareFieldsGrouping ["word"])} (TestWordCounter.) (Integer. 4)) "3" (Thrift/prepareBoltDetails - {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID) + {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID) (Thrift/prepareGlobalGrouping)} (TestGlobalCount.)) "4" (Thrift/prepareBoltDetails @@ -79,7 +79,7 @@ (Thrift/prepareGlobalGrouping)} (TestAggregatesCounter.))}) mocked-sources (doto (MockedSources.) - (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"])) + (.addMockData "spout" (into-array Values [(Values. (into-array ["nathan"])) (Values. (into-array ["bob"])) (Values. (into-array ["joey"])) (Values. (into-array ["nathan"]))]) @@ -93,7 +93,7 @@ topology complete-topology-param)] (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]] - (Testing/readTuples results "1"))) + (Testing/readTuples results "spout"))) (is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]] (Testing/readTuples results "2"))) (is (= [[1] [2] [3] [4]] @@ -102,18 +102,36 @@ (Testing/readTuples results "4"))) )))) -(deftest test-complete-topology - (doseq [zmq-on? [true false] - :let [daemon-conf (doto (Config.) - (.put STORM-LOCAL-MODE-ZMQ zmq-on?)) - mk-cluster-param (doto (MkClusterParam.) - (.setSupervisors (int 4)) - (.setDaemonConf daemon-conf))]] +(deftest test-complete-topology-netty-simulated + (let [daemon-conf (doto (Config.) + (.put STORM-LOCAL-MODE-ZMQ true)) + mk-cluster-param (doto (MkClusterParam.) + (.setSupervisors (int 4)) + (.setDaemonConf daemon-conf))] (Testing/withSimulatedTimeLocalCluster - mk-cluster-param complete-topology-testjob ) + mk-cluster-param complete-topology-testjob))) + +(deftest test-complete-topology-netty + (let [daemon-conf (doto (Config.) + (.put STORM-LOCAL-MODE-ZMQ true)) + mk-cluster-param (doto (MkClusterParam.) + (.setSupervisors (int 4)) + (.setDaemonConf daemon-conf))] (Testing/withLocalCluster mk-cluster-param complete-topology-testjob))) +(deftest test-complete-topology-local + (let [mk-cluster-param (doto (MkClusterParam.) + (.setSupervisors (int 4)))] + (Testing/withLocalCluster + mk-cluster-param complete-topology-testjob))) + +(deftest test-complete-topology-local-simulated + (let [mk-cluster-param (doto (MkClusterParam.) + (.setSupervisors (int 4)))] + (Testing/withSimulatedTimeLocalCluster + mk-cluster-param complete-topology-testjob))) + (deftest test-with-tracked-cluster (Testing/withTrackedCluster (reify TestJob http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java index e346a09..d68e512 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java @@ -41,7 +41,7 @@ import org.apache.storm.generated.LocalAssignment; import org.apache.storm.generated.NodeInfo; import org.apache.storm.generated.ProfileRequest; import org.apache.storm.generated.WorkerResources; -import org.apache.storm.localizer.ILocalizer; +import org.apache.storm.localizer.AsyncLocalizer; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.Time; @@ -60,7 +60,7 @@ public class ReadClusterState implements Runnable, AutoCloseable { private final AtomicInteger readRetry = new AtomicInteger(0); private final String assignmentId; private final ISupervisor iSuper; - private final ILocalizer localizer; + private final AsyncLocalizer localizer; private final ContainerLauncher launcher; private final String host; private final LocalState localState; http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index d221b71..6533d15 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -43,7 +43,7 @@ import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.generated.LocalAssignment; import org.apache.storm.generated.ProfileAction; import org.apache.storm.generated.ProfileRequest; -import org.apache.storm.localizer.ILocalizer; +import org.apache.storm.localizer.AsyncLocalizer; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.LocalState; @@ -79,7 +79,7 @@ public class Slot extends Thread implements AutoCloseable { } static class StaticState { - public final ILocalizer localizer; + public final AsyncLocalizer localizer; public final long hbTimeoutMs; public final long firstHbTimeoutMs; public final long killSleepMs; @@ -90,10 +90,10 @@ public class Slot extends Thread implements AutoCloseable { public final ISupervisor iSupervisor; public final LocalState localState; - StaticState(ILocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs, - long killSleepMs, long monitorFreqMs, - ContainerLauncher containerLauncher, String host, int port, - ISupervisor iSupervisor, LocalState localState) { + StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs, + long killSleepMs, long monitorFreqMs, + ContainerLauncher containerLauncher, String host, int port, + ISupervisor iSupervisor, LocalState localState) { this.localizer = localizer; this.hbTimeoutMs = hbTimeoutMs; this.firstHbTimeoutMs = firstHbTimeoutMs; @@ -684,12 +684,12 @@ public class Slot extends Thread implements AutoCloseable { private volatile DynamicState dynamicState; private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments; - public Slot(ILocalizer localizer, Map<String, Object> conf, - ContainerLauncher containerLauncher, String host, - int port, LocalState localState, - IStormClusterState clusterState, - ISupervisor iSupervisor, - AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception { + public Slot(AsyncLocalizer localizer, Map<String, Object> conf, + ContainerLauncher containerLauncher, String host, + int port, LocalState localState, + IStormClusterState clusterState, + ISupervisor iSupervisor, + AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception { super("SLOT_"+port); this.cachedCurrentAssignments = cachedCurrentAssignments; http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java index 1f8d4c3..08d32f1 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.File; @@ -25,7 +26,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; @@ -39,19 +39,15 @@ import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.daemon.DaemonCommon; import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; -import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; import org.apache.storm.event.EventManager; import org.apache.storm.event.EventManagerImp; import org.apache.storm.generated.LocalAssignment; import org.apache.storm.localizer.AsyncLocalizer; -import org.apache.storm.localizer.ILocalizer; -import org.apache.storm.localizer.Localizer; import org.apache.storm.messaging.IContext; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerConfigUtils; -import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.LocalState; @@ -78,8 +74,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable { private final AtomicReference<Map<Long, LocalAssignment>> currAssignment; private final StormTimer heartbeatTimer; private final StormTimer eventTimer; - private final StormTimer blobUpdateTimer; - private final Localizer localizer; private final AsyncLocalizer asyncLocalizer; private EventManager eventManager; private ReadClusterState readState; @@ -110,10 +104,11 @@ public class Supervisor implements DaemonCommon, AutoCloseable { throw Utils.wrapInRuntime(e); } + this.currAssignment = new AtomicReference<>(new HashMap<>()); + try { this.localState = ServerConfigUtils.supervisorState(conf); - this.localizer = ServerUtils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf)); - this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer); + this.asyncLocalizer = new AsyncLocalizer(conf, currAssignment, localState.getLocalAssignmentsMap()); } catch (IOException e) { throw Utils.wrapInRuntime(e); } @@ -126,13 +121,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable { throw Utils.wrapInRuntime(e); } - this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>()); - this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); - - this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler()); } public String getId() { @@ -178,12 +169,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable { public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() { return currAssignment; } - - public Localizer getLocalizer() { - return localizer; - } - ILocalizer getAsyncLocalizer() { + AsyncLocalizer getAsyncLocalizer() { return asyncLocalizer; } @@ -199,8 +186,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable { String path = ServerConfigUtils.supervisorTmpDir(conf); FileUtils.cleanDirectory(new File(path)); - Localizer localizer = getLocalizer(); - SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this); hb.run(); // should synchronize supervisor so it doesn't launch anything after being down (optimization) @@ -209,36 +194,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable { this.eventManager = new EventManagerImp(false); this.readState = new ReadClusterState(this); - - Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf); - Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap(); - if (portToAssignments != null) { - Map<String, LocalAssignment> assignments = new HashMap<>(); - for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) { - assignments.put(la.get_topology_id(), la); - } - for (String topoId : downloadedTopoIds) { - LocalAssignment la = assignments.get(topoId); - if (la != null) { - SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner()); - } else { - LOG.warn("Could not find an owner for topo {}", topoId); - } - } - } - // do this after adding the references so we don't try to clean things being used - localizer.startCleaner(); - UpdateBlobs updateBlobsThread = new UpdateBlobs(this); + asyncLocalizer.start(); if ((Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) { // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up // to date even if callbacks don't all work exactly right eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager)); - // Blob update thread. Starts with 30 seconds delay, every 30 seconds - blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager)); - // supervisor health check eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this)); } @@ -282,15 +245,13 @@ public class Supervisor implements DaemonCommon, AutoCloseable { this.active = false; heartbeatTimer.close(); eventTimer.close(); - blobUpdateTimer.close(); if (eventManager != null) { eventManager.close(); } if (readState != null) { readState.close(); } - asyncLocalizer.shutdown(); - localizer.shutdown(); + asyncLocalizer.close(); getStormClusterState().disconnect(); } catch (Exception e) { LOG.error("Error Shutting down", e); http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java index 09c2b5d..33574c3 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -19,9 +19,7 @@ package org.apache.storm.daemon.supervisor; import org.apache.storm.Config; import org.apache.storm.generated.LSWorkerHeartbeat; -import org.apache.storm.generated.LocalAssignment; import org.apache.storm.localizer.LocalResource; -import org.apache.storm.localizer.Localizer; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; @@ -33,14 +31,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.URLDecoder; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; public class SupervisorUtils { @@ -95,34 +90,6 @@ public class SupervisorUtils { return localResourceList; } - /** - * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the - * cache on restart. - * - * @param localizer - * @param stormId - * @param conf - */ - static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException { - Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME); - List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); - if (blobstoreMap != null) { - localizer.addReferences(localresources, user, topoName); - } - } - - public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException { - Set<String> stormIds = new HashSet<>(); - String path = ConfigUtils.supervisorStormDistRoot(conf); - Collection<String> rets = ConfigUtils.readDirContents(path); - for (String ret : rets) { - stormIds.add(URLDecoder.decode(ret)); - } - return stormIds; - } - public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) { String workerRoot = ConfigUtils.workerRoot(conf); return ConfigUtils.readDirContents(workerRoot); http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java deleted file mode 100644 index b5dbf57..0000000 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor.timer; - -import java.util.HashMap; -import org.apache.storm.Config; -import org.apache.storm.daemon.supervisor.Supervisor; -import org.apache.storm.daemon.supervisor.SupervisorUtils; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.KeyNotFoundException; -import org.apache.storm.generated.LocalAssignment; -import org.apache.storm.localizer.LocalResource; -import org.apache.storm.localizer.Localizer; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.Utils; -import org.apache.storm.utils.NimbusLeaderNotFoundException; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - -/** - * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The - * Runnable is intended to be run periodically by a timer, created elsewhere. - */ -public class UpdateBlobs implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class); - - private Supervisor supervisor; - - public UpdateBlobs(Supervisor supervisor) { - this.supervisor = supervisor; - } - - @Override - public void run() { - try { - Map<String, Object> conf = supervisor.getConf(); - Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf); - AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment(); - Map<String, LocalAssignment> assignedStormIds = new HashMap<>(); - for (LocalAssignment localAssignment : newAssignment.get().values()) { - assignedStormIds.put(localAssignment.get_topology_id(), localAssignment); - } - for (String stormId : downloadedStormIds) { - LocalAssignment la = assignedStormIds.get(stormId); - if (la != null) { - if (la.get_owner() == null) { - //We got a case where the local assignment is not up to date, no point in going on... - LOG.warn("The blobs will not be updated for {} until the local assignment is updated...", stormId); - } else { - String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); - LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot); - updateBlobsForTopology(conf, stormId, supervisor.getLocalizer(), la.get_owner()); - } - } - } - } catch (Exception e) { - if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) { - LOG.error("Network error while updating blobs, will retry again later", e); - } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) { - LOG.error("Nimbus unavailable to update blobs, will retry again later", e); - } else { - throw Utils.wrapInRuntime(e); - } - } - } - - /** - * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded. - * - * @param conf - * @param stormId - * @param localizer - * @throws IOException - */ - private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer, String user) throws IOException { - Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); - try { - localizer.updateBlobs(localresources, user); - } catch (AuthorizationException authExp) { - LOG.error("AuthorizationException error", authExp); - } catch (KeyNotFoundException knf) { - LOG.error("KeyNotFoundException error", knf); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java index 0a64370..6b9d4f1 100644 --- a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java +++ b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java @@ -73,7 +73,7 @@ public class EventManagerImp implements EventManager { runner.start(); } - public void proccessInc() { + private void proccessInc() { processed.incrementAndGet(); }