This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
commit 45d141cc24b9c5b65a90fbb6667140350336f84b Author: Bharat Gulati <[email protected]> AuthorDate: Sat Jun 17 13:58:48 2023 +0530 support worker hook declaration in flux --- .../src/main/java/org/apache/storm/flux/Flux.java | 6 +++ .../java/org/apache/storm/flux/FluxBuilder.java | 17 ++++++++ .../org/apache/storm/flux/model/TopologyDef.java | 43 ++++++++++++++++++++ .../org/apache/storm/flux/model/WorkerHookDef.java | 25 ++++++++++++ .../test/java/org/apache/storm/flux/TCKTest.java | 13 ++++++ .../src/test/resources/configs/worker_hook.yaml | 47 ++++++++++++++++++++++ 6 files changed, 151 insertions(+) diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java index 15f492d10..c89c4d70c 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java @@ -39,6 +39,7 @@ import org.apache.storm.flux.model.ExecutionContext; import org.apache.storm.flux.model.SpoutDef; import org.apache.storm.flux.model.StreamDef; import org.apache.storm.flux.model.TopologyDef; +import org.apache.storm.flux.model.WorkerHookDef; import org.apache.storm.flux.parser.FluxParser; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.SubmitOptions; @@ -210,6 +211,11 @@ public class Flux { for (StreamDef sd : t.getStreams()) { printf("%s --%s--> %s", sd.getFrom(), sd.getGrouping().getType(), sd.getTo()); } + + print("--------------- WORKER HOOKS ---------------"); + for (WorkerHookDef whd : t.getWorkerHooks()) { + printf("%s (%s)", whd.getId(), whd.getClassName()); + } print("--------------------------------------"); } } diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java index 3e5fbe0d9..148852ebf 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java @@ -43,8 +43,10 @@ import org.apache.storm.flux.model.PropertyDef; import org.apache.storm.flux.model.SpoutDef; import org.apache.storm.flux.model.StreamDef; import org.apache.storm.flux.model.TopologyDef; +import org.apache.storm.flux.model.WorkerHookDef; import org.apache.storm.generated.StormTopology; import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.hooks.IWorkerHook; import org.apache.storm.topology.BoltDeclarer; import org.apache.storm.topology.IBasicBolt; import org.apache.storm.topology.IRichBolt; @@ -117,6 +119,9 @@ public class FluxBuilder { // process stream definitions buildStreamDefinitions(context, builder); + // create worker hooks + buildWorkerHooks(context, builder); + topology = builder.createTopology(); } else { // user class supplied... @@ -478,6 +483,18 @@ public class FluxBuilder { } } + /** + * Given a list of worker hook definitions, build a Storm worker hook implementation by attempting to find a matching + * constructor in the given worker hook class and add them to the topology builder. + */ + private static void buildWorkerHooks(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException, + NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException { + for (WorkerHookDef whDef: context.getTopologyDef().getWorkerHooks()) { + IWorkerHook workerHook = (IWorkerHook) buildObject(whDef, context); + builder.addWorkerHook(workerHook); + } + } + /** * Given a list of constructor arguments, and a target class, attempt to find a suitable constructor. */ diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java index a0b4a0329..7744e4c2d 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java @@ -53,6 +53,7 @@ public class TopologyDef { private Map<String, BoltDef> boltMap = new LinkedHashMap<String, BoltDef>(); private Map<String, SpoutDef> spoutMap = new LinkedHashMap<String, SpoutDef>(); private List<StreamDef> streams = new ArrayList<StreamDef>(); + private Map<String, WorkerHookDef> workerHookMap = new LinkedHashMap<>(); public String getName() { @@ -163,6 +164,27 @@ public class TopologyDef { this.includes = includes; } + /** + * Returns worker hook definitions. + * @return worker hook definitions + */ + public List<WorkerHookDef> getWorkerHooks() { + ArrayList<WorkerHookDef> retval = new ArrayList<>(); + retval.addAll(this.workerHookMap.values()); + return retval; + } + + /** + * Sets worker hook definitions. + * @param workerHooks worker hook definitions + */ + public void setWorkerHooks(List<WorkerHookDef> workerHooks) { + this.workerHookMap = new LinkedHashMap<>(); + for (WorkerHookDef workerHook : workerHooks) { + this.workerHookMap.put(workerHook.getId(), workerHook); + } + } + // utility methods public int parallelismForBolt(String boltId) { return this.boltMap.get(boltId).getParallelism(); @@ -180,6 +202,10 @@ public class TopologyDef { return this.componentMap.get(id); } + public WorkerHookDef getWorkerHook(String id) { + return this.workerHookMap.get(id); + } + /** * Adds a list of bolt definitions. Optionally overriding existing definitions * if one with the same ID already exists. @@ -243,6 +269,23 @@ public class TopologyDef { this.streams.addAll(streams); } + /** + * Adds a list of worker hook definitions. Optionally overriding existing definitions + * if one with the same ID already exists. + * @param workerHooks worker hook definitions + * @param override whether or not to override existing definitions + */ + public void addAllWorkerHooks(List<WorkerHookDef> workerHooks, boolean override) { + for (WorkerHookDef workerHook : workerHooks) { + String id = workerHook.getId(); + if (this.workerHookMap.get(id) == null || override) { + this.workerHookMap.put(workerHook.getId(), workerHook); + } else { + LOG.warn("Ignoring attempt to create worker hook '{}' with override == false.", id); + } + } + } + public TopologySourceDef getTopologySource() { return topologySource; } diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/WorkerHookDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/WorkerHookDef.java new file mode 100644 index 000000000..05d717b87 --- /dev/null +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/WorkerHookDef.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.flux.model; + +/** + * Bean representation of a Storm worker hook. + */ +public class WorkerHookDef extends BeanDef { +} diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index f5bb25a55..951678573 100644 --- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java +++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@ -278,4 +278,17 @@ public class TCKTest { IllegalArgumentException expectedException = assertThrows(IllegalArgumentException.class, () -> FluxBuilder.buildTopology(context)); assertTrue(expectedException.getMessage().contains("Couldn't find a suitable static method")); } + + @Test + public void testTopologyWithWorkerHook() throws Exception { + TopologyDef topologyDef = FluxParser.parseResource("/configs/worker_hook.yaml", false, true, null, false); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + StormTopology topology = FluxBuilder.buildTopology(context); + assertNotNull(topology); + assertTrue(topologyDef.getName().equals("worker-hook-topology")); + assertTrue(topologyDef.getWorkerHooks().size() > 0); + assertTrue(topology.get_worker_hooks_size() > 0); + topology.validate(); + } } diff --git a/flux/flux-core/src/test/resources/configs/worker_hook.yaml b/flux/flux-core/src/test/resources/configs/worker_hook.yaml new file mode 100644 index 000000000..65e54fc70 --- /dev/null +++ b/flux/flux-core/src/test/resources/configs/worker_hook.yaml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +--- +name: "worker-hook-topology" + +config: + topology.workers: 1 + +# spout definitions +spouts: + - id: "spout-1" + className: "org.apache.storm.testing.TestWordSpout" + parallelism: 1 + # ... + +# bolt definitions +bolts: + - id: "bolt-1" + className: "org.apache.storm.testing.TestWordCounter" + parallelism: 1 + +#stream definitions +# stream definitions define connections between spouts and bolts. +# note that such connections can be cyclical +streams: + - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) + from: "spout-1" + to: "bolt-1" + grouping: + type: SHUFFLE + +workerHooks: + - id: "base-worker-hook" + className: "org.apache.storm.hooks.BaseWorkerHook"
