Repository: storm Updated Branches: refs/heads/master 26818f2b4 -> 8120e15ef
fix STORM-2979 Add in WorkerState an internal list of workerHooks and use it to start/stop Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce7c716f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce7c716f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce7c716f Branch: refs/heads/master Commit: ce7c716fc55c9af792998c2b3d512018cdbd8bec Parents: ffa607e Author: michelo <michelo@michelo-desktop> Authored: Sat Apr 7 18:13:12 2018 +0200 Committer: michelo <michelo@michelo-desktop> Committed: Sat Apr 7 18:13:12 2018 +0200 ---------------------------------------------------------------------- .../apache/storm/daemon/worker/WorkerState.java | 28 +++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ce7c716f/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index 9c7cf9e..b763c8d 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -63,7 +63,7 @@ import org.apache.storm.generated.StreamInfo; import org.apache.storm.generated.TopologyStatus; import org.apache.storm.grouping.Load; import org.apache.storm.grouping.LoadMapping; -import org.apache.storm.hooks.BaseWorkerHook; +import org.apache.storm.hooks.IWorkerHook; import org.apache.storm.messaging.ConnectionWithStatus; import org.apache.storm.messaging.DeserializingConnectionCallback; import org.apache.storm.messaging.IConnection; @@ -95,6 +95,11 @@ public class WorkerState { final IContext mqContext; private final WorkerTransfer workerTransfer; private final BackPressureTracker bpTracker; + private final List<IWorkerHook> deserializedWorkerHooks; + + public List<IWorkerHook> getDeserializedWorkerHooks() { + return deserializedWorkerHooks; + } public Map getConf() { return conf; @@ -339,6 +344,15 @@ public class WorkerState { int maxTaskId = getMaxTaskId(componentToSortedTasks); this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId); this.bpTracker = new BackPressureTracker(workerId, localTaskIds); + // + this.deserializedWorkerHooks = new ArrayList<>(); + if (topology.is_set_worker_hooks()) { + for (ByteBuffer hook : topology.get_worker_hooks()) { + byte[] hookBytes = Utils.toByteArray(hook); + IWorkerHook hookObject = Utils.javaDeserialize(hookBytes, IWorkerHook.class); + deserializedWorkerHooks.add(hookObject); + } + } } public void refreshConnections() { @@ -599,20 +613,16 @@ public class WorkerState { public void runWorkerStartHooks() { WorkerTopologyContext workerContext = getWorkerTopologyContext(); if (topology.is_set_worker_hooks()) { - for (ByteBuffer hook : topology.get_worker_hooks()) { - byte[] hookBytes = Utils.toByteArray(hook); - BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class); - hookObject.start(topologyConf, workerContext); + for (IWorkerHook hook : getDeserializedWorkerHooks()) { + hook.start(topologyConf, workerContext); } } } public void runWorkerShutdownHooks() { if (topology.is_set_worker_hooks()) { - for (ByteBuffer hook : topology.get_worker_hooks()) { - byte[] hookBytes = Utils.toByteArray(hook); - BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class); - hookObject.shutdown(); + for (IWorkerHook hook : getDeserializedWorkerHooks()) { + hook.shutdown(); } } }