http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/thrift.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/thrift.clj b/storm-core/src/clj/org/apache/storm/thrift.clj deleted file mode 100644 index 779c1d1..0000000 --- a/storm-core/src/clj/org/apache/storm/thrift.clj +++ /dev/null @@ -1,286 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.thrift - (:import [java.util HashMap] - [java.io Serializable] - [org.apache.storm.generated NodeInfo Assignment]) - (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology - StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface - ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo - GlobalStreamId ComponentObject ComponentObject$_Fields - ShellComponent SupervisorInfo]) - (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils]) - (:import [org.apache.storm Constants]) - (:import [org.apache.storm.security.auth ReqContext]) - (:import [org.apache.storm.grouping CustomStreamGrouping]) - (:import [org.apache.storm.topology TopologyBuilder]) - (:import [org.apache.storm.clojure RichShellBolt RichShellSpout]) - (:import [org.apache.thrift.transport TTransport] - (org.json.simple JSONValue)) - (:use [org.apache.storm util config log zookeeper])) - -(defn instantiate-java-object - [^JavaObject obj] - (let [name (symbol (.get_full_class_name obj)) - args (map (memfn getFieldValue) (.get_args_list obj))] - (eval `(new ~name ~@args)))) - -(def grouping-constants - {Grouping$_Fields/FIELDS :fields - Grouping$_Fields/SHUFFLE :shuffle - Grouping$_Fields/ALL :all - Grouping$_Fields/NONE :none - Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized - Grouping$_Fields/CUSTOM_OBJECT :custom-object - Grouping$_Fields/DIRECT :direct - Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle}) - -(defn grouping-type - [^Grouping grouping] - (grouping-constants (.getSetField grouping))) - -(defn field-grouping - [^Grouping grouping] - (when-not (= (grouping-type grouping) :fields) - (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping"))) - (.get_fields grouping)) - -(defn global-grouping? - [^Grouping grouping] - (and (= :fields (grouping-type grouping)) - (empty? (field-grouping grouping)))) - -(defn parallelism-hint - [^ComponentCommon component-common] - (let [phint (.get_parallelism_hint component-common)] - (if-not (.is_set_parallelism_hint component-common) 1 phint))) - -(defn nimbus-client-and-conn - ([host port] - (nimbus-client-and-conn host port nil)) - ([host port as-user] - (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user) - (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) - nimbusClient (NimbusClient. conf host port nil as-user) - client (.getClient nimbusClient) - transport (.transport nimbusClient)] - [client transport] ))) - -(defmacro with-nimbus-connection - [[client-sym host port] & body] - `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)] - (try - ~@body - (finally (.close conn#))))) - -(defmacro with-configured-nimbus-connection - [client-sym & body] - `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig)) - context# (ReqContext/context) - user# (if (.principal context#) (.getName (.principal context#))) - nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#) - ~client-sym (.getClient nimbusClient#) - conn# (.transport nimbusClient#) - ] - (try - ~@body - (finally (.close conn#))))) - -(defn direct-output-fields - [fields] - (StreamInfo. fields true)) - -(defn output-fields - [fields] - (StreamInfo. fields false)) - -;TODO: when translating this function, you should replace the map-val with a proper for loop HERE -(defn mk-output-spec - [output-spec] - (let [output-spec (if (map? output-spec) - output-spec - {Utils/DEFAULT_STREAM_ID output-spec})] - (map-val - (fn [out] - (if (instance? StreamInfo out) - out - (StreamInfo. out false))) - output-spec))) - -(defnk mk-plain-component-common - [inputs output-spec parallelism-hint :conf nil] - (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))] - (when parallelism-hint - (.set_parallelism_hint ret parallelism-hint)) - (when conf - (.set_json_conf ret (JSONValue/toJSONString conf))) - ret)) - -(defnk mk-spout-spec* - [spout outputs :p nil :conf nil] - (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout)) - (mk-plain-component-common {} outputs p :conf conf))) - -(defn mk-shuffle-grouping - [] - (Grouping/shuffle (NullStruct.))) - -(defn mk-local-or-shuffle-grouping - [] - (Grouping/local_or_shuffle (NullStruct.))) - -(defn mk-fields-grouping - [fields] - (Grouping/fields fields)) - -(defn mk-global-grouping - [] - (mk-fields-grouping [])) - -(defn mk-direct-grouping - [] - (Grouping/direct (NullStruct.))) - -(defn mk-all-grouping - [] - (Grouping/all (NullStruct.))) - -(defn mk-none-grouping - [] - (Grouping/none (NullStruct.))) - -(defn deserialized-component-object - [^ComponentObject obj] - (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA) - (throw (RuntimeException. "Cannot deserialize non-java-serialized object"))) - (Utils/javaDeserialize (.get_serialized_java obj) Serializable)) - -(defn serialize-component-object - [obj] - (ComponentObject/serialized_java (Utils/javaSerialize obj))) - -(defn- mk-grouping - [grouping-spec] - (cond (nil? grouping-spec) - (mk-none-grouping) - - (instance? Grouping grouping-spec) - grouping-spec - - (instance? CustomStreamGrouping grouping-spec) - (Grouping/custom_serialized (Utils/javaSerialize grouping-spec)) - - (instance? JavaObject grouping-spec) - (Grouping/custom_object grouping-spec) - - (sequential? grouping-spec) - (mk-fields-grouping grouping-spec) - - (= grouping-spec :shuffle) - (mk-shuffle-grouping) - - (= grouping-spec :local-or-shuffle) - (mk-local-or-shuffle-grouping) - (= grouping-spec :none) - (mk-none-grouping) - - (= grouping-spec :all) - (mk-all-grouping) - - (= grouping-spec :global) - (mk-global-grouping) - - (= grouping-spec :direct) - (mk-direct-grouping) - - true - (throw (IllegalArgumentException. - (str grouping-spec " is not a valid grouping"))))) - -(defn- mk-inputs - [inputs] - (into {} (for [[stream-id grouping-spec] inputs] - [(if (sequential? stream-id) - (GlobalStreamId. (first stream-id) (second stream-id)) - (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID)) - (mk-grouping grouping-spec)]))) - -(defnk mk-bolt-spec* - [inputs bolt outputs :p nil :conf nil] - (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)] - (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt)) - common))) - -(defnk mk-spout-spec - [spout :parallelism-hint nil :p nil :conf nil] - (let [parallelism-hint (if p p parallelism-hint)] - {:obj spout :p parallelism-hint :conf conf})) - -(defn- shell-component-params - [command script-or-output-spec kwargs] - (if (string? script-or-output-spec) - [(into-array String [command script-or-output-spec]) - (first kwargs) - (rest kwargs)] - [(into-array String command) - script-or-output-spec - kwargs])) - -(defnk mk-bolt-spec - [inputs bolt :parallelism-hint nil :p nil :conf nil] - (let [parallelism-hint (if p p parallelism-hint)] - {:obj bolt :inputs inputs :p parallelism-hint :conf conf})) - -(defn mk-shell-bolt-spec - [inputs command script-or-output-spec & kwargs] - (let [[command output-spec kwargs] - (shell-component-params command script-or-output-spec kwargs)] - (apply mk-bolt-spec inputs - (RichShellBolt. command (mk-output-spec output-spec)) kwargs))) - -(defn mk-shell-spout-spec - [command script-or-output-spec & kwargs] - (let [[command output-spec kwargs] - (shell-component-params command script-or-output-spec kwargs)] - (apply mk-spout-spec - (RichShellSpout. command (mk-output-spec output-spec)) kwargs))) - -(defn- add-inputs - [declarer inputs] - (doseq [[id grouping] (mk-inputs inputs)] - (.grouping declarer id grouping))) - -(defn mk-topology - ([spout-map bolt-map] - (let [builder (TopologyBuilder.)] - (doseq [[name {spout :obj p :p conf :conf}] spout-map] - (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf))) - (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map] - (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs))) - (.createTopology builder))) - ([spout-map bolt-map state-spout-map] - (mk-topology spout-map bolt-map))) - -;; clojurify-structure is needed or else every element becomes the same after successive calls -;; don't know why this happens -(def STORM-TOPOLOGY-FIELDS - (-> StormTopology/metaDataMap clojurify-structure keys)) - -(def SPOUT-FIELDS - [StormTopology$_Fields/SPOUTS - StormTopology$_Fields/STATE_SPOUTS]) -
http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index 1bf85d4..5b5acdb 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -49,7 +49,7 @@ (:require [compojure.route :as route] [compojure.handler :as handler] [ring.util.response :as resp] - [org.apache.storm [thrift :as thrift]]) + [org.apache.storm.internal [thrift :as thrift]]) (:require [metrics.meters :refer [defmeter mark!]]) (:import [org.apache.commons.lang StringEscapeUtils]) (:import [org.apache.logging.log4j Level]) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/Thrift.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Thrift.java b/storm-core/src/jvm/org/apache/storm/Thrift.java new file mode 100644 index 0000000..cde822f --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/Thrift.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm; + +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.JavaObjectArg; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StateSpoutSpec; +import org.apache.storm.generated.StreamInfo; + +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.HashMap; +import java.io.Serializable; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.storm.generated.JavaObject; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.StormTopology._Fields; +import org.apache.storm.generated.ComponentCommon; +import org.apache.storm.generated.NullStruct; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.ComponentObject; + +import org.apache.storm.task.IBolt; +import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IBasicBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.SpoutDeclarer; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.utils.Utils; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.topology.TopologyBuilder; + +public class Thrift { + private static Logger LOG = LoggerFactory.getLogger(Thrift.class); + + private static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null; + private static StormTopology._Fields[] SPOUT_FIELDS = + { StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS }; + + static { + Set<_Fields> keys = StormTopology.metaDataMap.keySet(); + keys.toArray(STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()]); + } + + public static StormTopology._Fields[] getTopologyFields() { + return STORM_TOPOLOGY_FIELDS; + } + + public static StormTopology._Fields[] getSpoutFields() { + return SPOUT_FIELDS; + } + + public static class SpoutDetails { + private IRichSpout spout; + private Integer parallelism; + private Map conf; + + public SpoutDetails(IRichSpout spout, Integer parallelism, Map conf) { + this.spout = spout; + this.parallelism = parallelism; + this.conf = conf; + } + + public IRichSpout getSpout() { + return spout; + } + + public Integer getParallelism() { + return parallelism; + } + + public Map getConf() { + return conf; + } + } + + public static class BoltDetails { + private Object bolt; + private Map conf; + private Integer parallelism; + private Map<GlobalStreamId, Grouping> inputs; + + public BoltDetails(Object bolt, Map conf, Integer parallelism, + Map<GlobalStreamId, Grouping> inputs) { + this.bolt = bolt; + this.conf = conf; + this.parallelism = parallelism; + this.inputs = inputs; + } + + public Object getBolt() { + return bolt; + } + + public Map getConf() { + return conf; + } + + public Map<GlobalStreamId, Grouping> getInputs() { + return inputs; + } + + public Integer getParallelism() { + return parallelism; + } + } + + public static StreamInfo directOutputFields(List<String> fields) { + return new StreamInfo(fields, true); + } + + public static StreamInfo outputFields(List<String> fields) { + return new StreamInfo(fields, false); + } + + public static Grouping prepareShuffleGrouping() { + return Grouping.shuffle(new NullStruct()); + } + + public static Grouping prepareLocalOrShuffleGrouping() { + return Grouping.local_or_shuffle(new NullStruct()); + } + + public static Grouping prepareFieldsGrouping(List<String> fields) { + return Grouping.fields(fields); + } + + public static Grouping prepareGlobalGrouping() { + return prepareFieldsGrouping(new ArrayList<String>()); + } + + public static Grouping prepareDirectGrouping() { + return Grouping.direct(new NullStruct()); + } + + public static Grouping prepareAllGrouping() { + return Grouping.all(new NullStruct()); + } + + public static Grouping prepareNoneGrouping() { + return Grouping.none(new NullStruct()); + } + + public static Grouping prepareCustomStreamGrouping(Object obj) { + return Grouping.custom_serialized(Utils.javaSerialize(obj)); + } + + public static Grouping prepareCustomJavaObjectGrouping(JavaObject obj) { + return Grouping.custom_object(obj); + } + + public static Object instantiateJavaObject(JavaObject obj) { + + List<JavaObjectArg> args = obj.get_args_list(); + Class[] paraTypes = new Class[args.size()]; + Object[] paraValues = new Object[args.size()]; + for (int i = 0; i < args.size(); i++) { + JavaObjectArg arg = args.get(i); + paraValues[i] = arg.getFieldValue(); + + if (arg.getSetField().equals(JavaObjectArg._Fields.INT_ARG)) { + paraTypes[i] = Integer.class; + } else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) { + paraTypes[i] = Long.class; + } else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) { + paraTypes[i] = String.class; + } else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) { + paraTypes[i] = Boolean.class; + } else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) { + paraTypes[i] = ByteBuffer.class; + } else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) { + paraTypes[i] = Double.class; + } else { + paraTypes[i] = Object.class; + } + } + + try { + Class clazz = Class.forName(obj.get_full_class_name()); + Constructor cons = clazz.getConstructor(paraTypes); + return cons.newInstance(paraValues); + } catch (Exception e) { + LOG.error("java object instantiation failed", e); + } + + return null; + + } + + public static Grouping._Fields groupingType(Grouping grouping) { + return grouping.getSetField(); + } + + public static List<String> fieldGrouping(Grouping grouping) { + if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) { + throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping"); + } + return grouping.get_fields(); + } + + public static boolean isGlobalGrouping(Grouping grouping) { + if (Grouping._Fields.FIELDS.equals(groupingType(grouping))) { + return fieldGrouping(grouping).isEmpty(); + } + + return false; + } + + public static int getParallelismHint(ComponentCommon componentCommon) { + if (!componentCommon.is_set_parallelism_hint()) { + return 1; + } else { + return componentCommon.get_parallelism_hint(); + } + } + + public static ComponentObject serializeComponentObject(Object obj) { + return ComponentObject.serialized_java(Utils.javaSerialize(obj)); + } + + public static Object deserializeComponentObject(ComponentObject obj) { + if (obj.getSetField() != ComponentObject._Fields.SERIALIZED_JAVA) { + throw new RuntimeException("Cannot deserialize non-java-serialized object"); + } + return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class); + } + + public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String, + StreamInfo> outputs, Integer parallelismHint) { + return prepareComponentCommon(inputs, outputs, parallelismHint, null); + } + + public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String, StreamInfo> outputs, + Integer parallelismHint, Map conf) { + Map<GlobalStreamId, Grouping> mappedInputs = new HashMap<>(); + Map<String, StreamInfo> mappedOutputs = new HashMap<>(); + if (inputs != null && !inputs.isEmpty()) { + mappedInputs.putAll(inputs); + } + if (outputs !=null && !outputs.isEmpty()) { + mappedOutputs.putAll(outputs); + } + ComponentCommon component = new ComponentCommon(mappedInputs, mappedOutputs); + if (parallelismHint != null) { + component.set_parallelism_hint(parallelismHint); + } + if (conf != null) { + component.set_json_conf(JSONValue.toJSONString(conf)); + } + return component; + } + + public static SpoutSpec prepareSerializedSpoutDetails(IRichSpout spout, Map<String, StreamInfo> outputs) { + return new SpoutSpec(ComponentObject.serialized_java + (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap(), outputs, null, null)); + } + + public static Bolt prepareSerializedBoltDetails(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, Map<String, StreamInfo> outputs, + Integer parallelismHint, Map conf) { + ComponentCommon common = prepareComponentCommon(inputs, outputs, parallelismHint, conf); + return new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common); + } + + public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt) { + return prepareBoltDetails(inputs, bolt, null, null); + } + + public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt, + Integer parallelismHint) { + return prepareBoltDetails(inputs, bolt, parallelismHint, null); + } + + public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt, + Integer parallelismHint, Map conf) { + BoltDetails details = new BoltDetails(bolt, conf, parallelismHint, inputs); + return details; + } + + public static SpoutDetails prepareSpoutDetails(IRichSpout spout) { + return prepareSpoutDetails(spout, null, null); + } + + public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint) { + return prepareSpoutDetails(spout, parallelismHint, null); + } + + public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint, Map conf) { + SpoutDetails details = new SpoutDetails(spout, parallelismHint, conf); + return details; + } + + public static StormTopology buildTopology(HashMap<String, SpoutDetails> spoutMap, + HashMap<String, BoltDetails> boltMap, HashMap<String, StateSpoutSpec> stateMap) { + return buildTopology(spoutMap, boltMap); + } + + private static void addInputs(BoltDeclarer declarer, Map<GlobalStreamId, Grouping> inputs) { + for(Entry<GlobalStreamId, Grouping> entry : inputs.entrySet()) { + declarer.grouping(entry.getKey(), entry.getValue()); + } + } + + public static StormTopology buildTopology(Map<String, SpoutDetails> spoutMap, Map<String, BoltDetails> boltMap) { + TopologyBuilder builder = new TopologyBuilder(); + for (Entry<String, SpoutDetails> entry : spoutMap.entrySet()) { + String spoutID = entry.getKey(); + SpoutDetails spec = entry.getValue(); + SpoutDeclarer spoutDeclarer = builder.setSpout(spoutID, spec.getSpout(), spec.getParallelism()); + spoutDeclarer.addConfigurations(spec.getConf()); + } + for (Entry<String, BoltDetails> entry : boltMap.entrySet()) { + String spoutID = entry.getKey(); + BoltDetails spec = entry.getValue(); + BoltDeclarer boltDeclarer = null; + if (spec.bolt instanceof IRichBolt) { + boltDeclarer = builder.setBolt(spoutID, (IRichBolt)spec.getBolt(), spec.getParallelism()); + } else { + boltDeclarer = builder.setBolt(spoutID, (IBasicBolt)spec.getBolt(), spec.getParallelism()); + } + boltDeclarer.addConfigurations(spec.getConf()); + addInputs(boltDeclarer, spec.getInputs()); + } + return builder.createTopology(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java b/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java index 45b263d..06853fe 100644 --- a/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java +++ b/storm-core/src/jvm/org/apache/storm/testing/NGrouping.java @@ -28,9 +28,7 @@ public class NGrouping implements CustomStreamGrouping { int _n; List<Integer> _outTasks; - public NGrouping(int n) { - _n = n; - } + public NGrouping(Integer n) {_n = n;} @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java index eaf7dc8..4beec48 100644 --- a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java +++ b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java @@ -28,18 +28,22 @@ import org.apache.storm.topology.OutputFieldsDeclarer; public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt { private static final long serialVersionUID = 1999209252187463355L; - - public PythonShellMetricsBolt(String[] command) { - super(command); + + public PythonShellMetricsBolt(String[] args) { + super(args); } + public PythonShellMetricsBolt(String command, String file) { + super(command, file); + } + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { super.prepare(stormConf, context, collector); - + CountShellMetric cMetric = new CountShellMetric(); context.registerMetric("my-custom-shell-metric", cMetric, 5); } - + public void declareOutputFields(OutputFieldsDeclarer declarer) { } http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java index ed6de14..657baa6 100644 --- a/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java +++ b/storm-core/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java @@ -33,11 +33,15 @@ public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout { public PythonShellMetricsSpout(String[] command) { super(command); } - + + public PythonShellMetricsSpout(String command, String file) { + super(command, file); + } + @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { super.open(conf, context, collector); - + CountShellMetric cMetric = new CountShellMetric(); context.registerMetric("my-custom-shellspout-metric", cMetric, 5); } http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index 9a849ea..eca9690 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -52,7 +52,6 @@ import org.apache.thrift.TSerializer; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; -import org.eclipse.jetty.util.log.Log; import org.json.simple.JSONValue; import org.json.simple.parser.ParseException; import org.slf4j.Logger; @@ -1456,6 +1455,13 @@ public class Utils { return number & Integer.MAX_VALUE; } + public static GlobalStreamId getGlobalStreamId(String streamId, String componentId) { + if (componentId == null) { + return new GlobalStreamId(streamId, DEFAULT_STREAM_ID); + } + return new GlobalStreamId(streamId, componentId); + } + public static RuntimeException wrapInRuntime(Exception e){ if (e instanceof RuntimeException){ return (RuntimeException)e; http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/integration/org/apache/storm/integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj index 5ba6651..6dce7d6 100644 --- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj @@ -15,26 +15,37 @@ ;; limitations under the License. (ns integration.org.apache.storm.integration-test (:use [clojure test]) - (:import [org.apache.storm Config]) + (:import [org.apache.storm Config Thrift]) (:import [org.apache.storm.topology TopologyBuilder]) (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout]) (:import [org.apache.storm.tuple Fields]) - (:use [org.apache.storm testing config clojure]) + (:use [org.apache.storm testing config util]) + (:use [org.apache.storm.internal clojure]) (:use [org.apache.storm.daemon common]) - (:require [org.apache.storm [thrift :as thrift]])) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils])) (deftest test-basic-topology (doseq [zmq-on? [true false]] (with-simulated-time-local-cluster [cluster :supervisors 4 :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}] - (let [topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4) - "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.)) - "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.)) - }) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestWordSpout. true) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareFieldsGrouping ["word"])} + (TestWordCounter.) (Integer. 4)) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareGlobalGrouping)} + (TestGlobalCount.)) + "4" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareGlobalGrouping)} + (TestAggregatesCounter.))}) results (complete-topology cluster topology :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]} @@ -60,12 +71,14 @@ (deftest test-multi-tasks-per-executor (with-simulated-time-local-cluster [cluster :supervisors 4] - (let [topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true))} - {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id - :parallelism-hint 3 - :conf {TOPOLOGY-TASKS 6}) - }) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareAllGrouping)} + emit-task-id + (Integer. 3) + {TOPOLOGY-TASKS 6})}) results (complete-topology cluster topology :mock-sources {"1" [["a"]]})] @@ -98,9 +111,11 @@ (let [feeder (feeder-spout ["field1"]) tracker (AckFailMapTracker.) _ (.setAckFailDelegate feeder tracker) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec feeder)} - {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})] + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareGlobalGrouping)} ack-every-other)})] (submit-local-topology (:nimbus cluster) "timeout-tester" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} @@ -117,24 +132,36 @@ ))) (defn mk-validate-topology-1 [] - (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)})) + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareFieldsGrouping ["word"])} + (TestWordCounter.) (Integer. 4))})) (defn mk-invalidate-topology-1 [] - (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)})) + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "3" nil) + (Thrift/prepareFieldsGrouping ["word"])} + (TestWordCounter.) (Integer. 4))})) (defn mk-invalidate-topology-2 [] - (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)})) + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareFieldsGrouping ["non-exists-field"])} + (TestWordCounter.) (Integer. 4))})) (defn mk-invalidate-topology-3 [] - (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)})) + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" "non-exists-stream") + (Thrift/prepareFieldsGrouping ["word"])} + (TestWordCounter.) (Integer. 4))})) (defn try-complete-wc-topology [cluster topology] (try (do @@ -164,10 +191,15 @@ (deftest test-system-stream ;; this test works because mocking a spout splits up the tuples evenly among the tasks (with-simulated-time-local-cluster [cluster] - (let [topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)} - {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1) - }) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestWordSpout. true) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareFieldsGrouping ["word"]) + (Utils/getGlobalStreamId "1" "__system") + (Thrift/prepareGlobalGrouping)} + identity-bolt (Integer. 1))}) results (complete-topology cluster topology :mock-sources {"1" [["a"] ["b"] ["c"]]} @@ -218,20 +250,38 @@ [feeder3 checker3] (ack-tracking-feeder ["num"]) tracked (mk-tracked-topology cluster - (topology - {"1" (spout-spec feeder1) - "2" (spout-spec feeder2) - "3" (spout-spec feeder3)} - {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2)) - "5" (bolt-spec {"2" :shuffle} (branching-bolt 4)) - "6" (bolt-spec {"3" :shuffle} (branching-bolt 1)) - "7" (bolt-spec - {"4" :shuffle - "5" :shuffle - "6" :shuffle} + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder1) + "2" (Thrift/prepareSpoutDetails feeder2) + "3" (Thrift/prepareSpoutDetails feeder3)} + {"4" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + (branching-bolt 2)) + "5" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareShuffleGrouping)} + (branching-bolt 4)) + "6" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "3" nil) + (Thrift/prepareShuffleGrouping)} + (branching-bolt 1)) + "7" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "4" nil) + (Thrift/prepareShuffleGrouping) + (Utils/getGlobalStreamId "5" nil) + (Thrift/prepareShuffleGrouping) + (Utils/getGlobalStreamId "6" nil) + (Thrift/prepareShuffleGrouping)} (agg-bolt 3)) - "8" (bolt-spec {"7" :shuffle} (branching-bolt 2)) - "9" (bolt-spec {"8" :shuffle} ack-bolt)} + "8" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "7" nil) + (Thrift/prepareShuffleGrouping)} + (branching-bolt 2)) + "9" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "8" nil) + (Thrift/prepareShuffleGrouping)} + ack-bolt)} ))] (submit-local-topology (:nimbus cluster) "acking-test1" @@ -268,13 +318,21 @@ (let [[feeder checker] (ack-tracking-feeder ["num"]) tracked (mk-tracked-topology cluster - (topology - {"1" (spout-spec feeder)} - {"2" (bolt-spec {"1" :shuffle} identity-bolt) - "3" (bolt-spec {"1" :shuffle} identity-bolt) - "4" (bolt-spec - {"2" :shuffle - "3" :shuffle} + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + identity-bolt) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + identity-bolt) + "4" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareShuffleGrouping) + (Utils/getGlobalStreamId "3" nil) + (Thrift/prepareShuffleGrouping)} (agg-bolt 4))}))] (submit-local-topology (:nimbus cluster) "test-acking2" @@ -314,10 +372,13 @@ (let [feeder (feeder-spout ["field1"]) tracker (AckFailMapTracker.) _ (.setAckFailDelegate feeder tracker) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec feeder) - "2" (thrift/mk-spout-spec open-tracked-spout)} - {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})] + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder) + "2" (Thrift/prepareSpoutDetails open-tracked-spout)} + {"3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareGlobalGrouping)} + prepare-tracked-bolt)})] (reset! bolt-prepared? false) (reset! spout-opened? false) @@ -343,10 +404,16 @@ (let [[feeder checker] (ack-tracking-feeder ["num"]) tracked (mk-tracked-topology cluster - (topology - {"1" (spout-spec feeder)} - {"2" (bolt-spec {"1" :shuffle} dup-anchor) - "3" (bolt-spec {"2" :shuffle} ack-bolt)}))] + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + dup-anchor) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareShuffleGrouping)} + ack-bolt)}))] (submit-local-topology (:nimbus cluster) "test" {} @@ -362,36 +429,6 @@ (checker 3) ))) -;; (defspout ConstantSpout ["val"] {:prepare false} -;; [collector] -;; (Time/sleep 100) -;; (emit-spout! collector [1])) - -;; (def errored (atom false)) -;; (def restarted (atom false)) - -;; (defbolt local-error-checker {} [tuple collector] -;; (when-not @errored -;; (reset! errored true) -;; (println "erroring") -;; (throw (RuntimeException.))) -;; (when-not @restarted (println "restarted")) -;; (reset! restarted true)) - -;; (deftest test-no-halt-local-mode -;; (with-simulated-time-local-cluster [cluster] -;; (let [topology (topology -;; {1 (spout-spec ConstantSpout)} -;; {2 (bolt-spec {1 :shuffle} local-error-checker) -;; })] -;; (submit-local-topology (:nimbus cluster) -;; "test" -;; {} -;; topology) -;; (while (not @restarted) -;; (advance-time-ms! 100)) -;; ))) - (defspout IncSpout ["word"] [conf context collector] (let [state (atom 0)] @@ -416,23 +453,6 @@ ) ))) -;; (deftest test-clojure-spout -;; (with-local-cluster [cluster] -;; (let [nimbus (:nimbus cluster) -;; top (topology -;; {1 (spout-spec IncSpout)} -;; {} -;; )] -;; (submit-local-topology nimbus -;; "spout-test" -;; {TOPOLOGY-DEBUG true -;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3} -;; top) -;; (Thread/sleep 10000) -;; (.killTopology nimbus "spout-test") -;; (Thread/sleep 10000) -;; ))) - (deftest test-kryo-decorators-config (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true @@ -513,11 +533,13 @@ (deftest test-hooks (with-simulated-time-local-cluster [cluster] - (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"]))) - } - {"2" (bolt-spec {"1" :shuffle} - hooks-bolt) - }) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. (Fields. ["conf"])))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + hooks-bolt)}) results (complete-topology cluster topology :mock-sources {"1" [[1] @@ -545,9 +567,12 @@ [feeder checker] (ack-tracking-feeder ["num"]) tracked (mk-tracked-topology cluster - (topology - {"1" (spout-spec feeder)} - {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)})) + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + report-errors-bolt)})) _ (submit-local-topology (:nimbus cluster) "test-errors" {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10 http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj index e86e893..3b1a48b 100644 --- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj @@ -15,15 +15,19 @@ ;; limitations under the License. (ns integration.org.apache.storm.testing4j-test (:use [clojure.test]) - (:use [org.apache.storm config clojure testing]) + (:use [org.apache.storm config testing util]) + (:use [org.apache.storm.internal clojure]) (:require [integration.org.apache.storm.integration-test :as it]) - (:require [org.apache.storm.thrift :as thrift]) - (:import [org.apache.storm Testing Config ILocalCluster]) + (:require [org.apache.storm.internal.thrift :as thrift]) + (:import [org.apache.storm Testing Config ILocalCluster] + [org.apache.storm.generated GlobalStreamId]) (:import [org.apache.storm.tuple Values Tuple]) (:import [org.apache.storm.utils Time Utils]) (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam - AckFailMapTracker MkTupleParam])) + AckFailMapTracker MkTupleParam]) + (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm Thrift])) (deftest test-with-simulated-time (is (= false (Time/isSimulating))) @@ -69,12 +73,20 @@ (Testing/withSimulatedTimeLocalCluster (reify TestJob (^void run [this ^ILocalCluster cluster] - (let [topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} - {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4) - "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.)) - "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.)) - }) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))} + {"2" (Thrift/prepareBoltDetails + {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareFieldsGrouping ["word"])} + (TestWordCounter.) (Integer. 4)) + "3" (Thrift/prepareBoltDetails + {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareGlobalGrouping)} + (TestGlobalCount.)) + "4" (Thrift/prepareBoltDetails + {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareGlobalGrouping)} + (TestAggregatesCounter.))}) mocked-sources (doto (MockedSources.) (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"])) (Values. (into-array ["bob"])) @@ -106,13 +118,21 @@ (let [[feeder checker] (it/ack-tracking-feeder ["num"]) tracked (Testing/mkTrackedTopology cluster - (topology - {"1" (spout-spec feeder)} - {"2" (bolt-spec {"1" :shuffle} it/identity-bolt) - "3" (bolt-spec {"1" :shuffle} it/identity-bolt) - "4" (bolt-spec - {"2" :shuffle - "3" :shuffle} + (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareShuffleGrouping)} + it/identity-bolt) + "3" (Thrift/prepareBoltDetails + {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareShuffleGrouping)} + it/identity-bolt) + "4" (Thrift/prepareBoltDetails + {(GlobalStreamId. "2" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareShuffleGrouping) + (GlobalStreamId. "3" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareShuffleGrouping)} (it/agg-bolt 4))}))] (.submitTopology cluster "test-acking2" @@ -139,9 +159,12 @@ (let [feeder (feeder-spout ["field1"]) tracker (AckFailMapTracker.) _ (.setAckFailDelegate feeder tracker) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec feeder)} - {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)}) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareGlobalGrouping)} + it/ack-every-other)}) storm-conf (doto (Config.) (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))] (.submitTopology cluster @@ -170,9 +193,12 @@ (let [feeder (feeder-spout ["field1"]) tracker (AckFailMapTracker.) _ (.setAckFailDelegate feeder tracker) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec feeder)} - {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)}) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID) + (Thrift/prepareGlobalGrouping)} + it/ack-every-other)}) storm-conf (doto (Config.) (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10) (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))] http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/clojure_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/clojure_test.clj b/storm-core/test/clj/org/apache/storm/clojure_test.clj index ccec825..13fdeb7 100644 --- a/storm-core/test/clj/org/apache/storm/clojure_test.clj +++ b/storm-core/test/clj/org/apache/storm/clojure_test.clj @@ -17,10 +17,12 @@ (:use [clojure test]) (:import [org.apache.storm.testing TestWordSpout TestPlannerSpout] [org.apache.storm.tuple Fields]) - (:use [org.apache.storm testing clojure config]) + (:use [org.apache.storm testing config]) + (:use [org.apache.storm.internal clojure]) (:use [org.apache.storm.daemon common]) - (:require [org.apache.storm [thrift :as thrift]])) - + (:require [org.apache.storm.internal [thrift :as thrift]]) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils])) (defbolt lalala-bolt1 ["word"] [[val :as tuple] collector] (let [ret (str val "lalala")] @@ -56,15 +58,21 @@ (deftest test-clojure-bolt (with-simulated-time-local-cluster [cluster :supervisors 4] (let [nimbus (:nimbus cluster) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. false))} - {"2" (thrift/mk-bolt-spec {"1" :shuffle} - lalala-bolt1) - "3" (thrift/mk-bolt-spec {"1" :local-or-shuffle} - lalala-bolt2) - "4" (thrift/mk-bolt-spec {"1" :shuffle} - (lalala-bolt3 "_nathan_"))} - ) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails (TestWordSpout. false))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + lalala-bolt1) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareLocalOrShuffleGrouping)} + lalala-bolt2) + "4" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + (lalala-bolt3 "_nathan_"))} + ) results (complete-topology cluster topology :mock-sources {"1" [["david"] @@ -91,11 +99,12 @@ (deftest test-map-emit (with-simulated-time-local-cluster [cluster :supervisors 4] - (let [topology (thrift/mk-topology - {"words" (thrift/mk-spout-spec (TestWordSpout. false))} - {"out" (thrift/mk-bolt-spec {"words" :shuffle} - punctuator-bolt)} - ) + (let [topology (Thrift/buildTopology + {"words" (Thrift/prepareSpoutDetails (TestWordSpout. false))} + {"out" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "words" nil) + (Thrift/prepareShuffleGrouping)} + punctuator-bolt)}) results (complete-topology cluster topology :mock-sources {"words" [["foo"] ["bar"]]} @@ -115,14 +124,19 @@ (deftest test-component-specific-config-clojure (with-simulated-time-local-cluster [cluster] - (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])) :conf {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40}) - } - {"2" (bolt-spec {"1" :shuffle} - (conf-query-bolt {"fake.config" 1 - TOPOLOGY-MAX-TASK-PARALLELISM 2 - TOPOLOGY-MAX-SPOUT-PENDING 10}) - :conf {TOPOLOGY-MAX-SPOUT-PENDING 3}) - }) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. (Fields. ["conf"])) + nil + {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + (conf-query-bolt {"fake.config" 1 + TOPOLOGY-MAX-TASK-PARALLELISM 2 + TOPOLOGY-MAX-SPOUT-PENDING 10}) + nil + {TOPOLOGY-MAX-SPOUT-PENDING 3})}) results (complete-topology cluster topology :topology-name "test123" http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj index b146cb0..18e3a80 100644 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@ -30,7 +30,8 @@ (:require [conjure.core]) (:use [conjure core]) (:use [clojure test]) - (:use [org.apache.storm cluster config util testing thrift log])) + (:use [org.apache.storm cluster config util testing log]) + (:use [org.apache.storm.internal thrift])) (defn mk-config [zk-port] (merge (clojurify-structure (ConfigUtils/readStormConfig)) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/drpc_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj index 3dcef7a..6024674 100644 --- a/storm-core/test/clj/org/apache/storm/drpc_test.clj +++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj @@ -16,7 +16,8 @@ (ns org.apache.storm.drpc-test (:use [clojure test]) (:import [org.apache.storm.drpc ReturnResults DRPCSpout - LinearDRPCTopologyBuilder]) + LinearDRPCTopologyBuilder] + [org.apache.storm.utils ConfigUtils Utils]) (:import [org.apache.storm.topology FailedException]) (:import [org.apache.storm.coordination CoordinatedBolt$FinishedCallback]) (:import [org.apache.storm LocalDRPC LocalCluster]) @@ -25,7 +26,9 @@ [org.apache.storm.utils.staticmocking ConfigUtilsInstaller]) (:import [org.apache.storm.generated DRPCExecutionException]) (:import [java.util.concurrent ConcurrentLinkedQueue]) - (:use [org.apache.storm config testing clojure]) + (:import [org.apache.storm Thrift]) + (:use [org.apache.storm config testing]) + (:use [org.apache.storm.internal clojure]) (:use [org.apache.storm.daemon common drpc]) (:use [conjure core])) @@ -40,12 +43,16 @@ (let [drpc (LocalDRPC.) spout (DRPCSpout. "test" drpc) cluster (LocalCluster.) - topology (topology - {"1" (spout-spec spout)} - {"2" (bolt-spec {"1" :shuffle} - exclamation-bolt) - "3" (bolt-spec {"2" :shuffle} - (ReturnResults.))})] + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails spout)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + exclamation-bolt) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "2" nil) + (Thrift/prepareGlobalGrouping)} + (ReturnResults.))})] (.submitTopology cluster "test" {} topology) (is (= "aaa!!!" (.execute drpc "test" "aaa"))) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/grouping_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj index f2a3f4b..61caf68 100644 --- a/storm-core/test/clj/org/apache/storm/grouping_test.clj +++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj @@ -18,9 +18,11 @@ (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping] [org.apache.storm.generated JavaObject JavaObjectArg]) (:import [org.apache.storm.grouping LoadMapping]) - (:use [org.apache.storm testing clojure log config]) + (:use [org.apache.storm testing log config]) + (:use [org.apache.storm.internal clojure]) (:use [org.apache.storm.daemon common executor]) - (:require [org.apache.storm [thrift :as thrift]])) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils])) (deftest test-shuffle (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true} nil "comp" "stream") @@ -77,12 +79,13 @@ (with-simulated-time-local-cluster [cluster :supervisors 4] (let [spout-phint 4 bolt-phint 6 - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) - :parallelism-hint spout-phint)} - {"2" (thrift/mk-bolt-spec {"1" ["word"]} - (TestWordBytesCounter.) - :parallelism-hint bolt-phint) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestWordSpout. true) (Integer. spout-phint))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareFieldsGrouping ["word"])} + (TestWordBytesCounter.) (Integer. spout-phint)) }) results (complete-topology cluster @@ -101,12 +104,13 @@ (with-simulated-time-local-cluster [cluster :supervisors 4] (let [spout-phint 4 bolt-phint 6 - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) - :parallelism-hint spout-phint)} - {"2" (thrift/mk-bolt-spec {"1" ["word"]} - (TestWordBytesCounter.) - :parallelism-hint bolt-phint) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestWordSpout. true) (Integer. spout-phint))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareFieldsGrouping ["word"])} + (TestWordBytesCounter.) (Integer. bolt-phint)) }) results (complete-topology cluster @@ -127,15 +131,21 @@ (deftest test-custom-groupings (with-simulated-time-local-cluster [cluster] - (let [topology (topology - {"1" (spout-spec (TestWordSpout. true))} - {"2" (bolt-spec {"1" (NGrouping. 2)} - id-bolt - :p 4) - "3" (bolt-spec {"1" (JavaObject. "org.apache.storm.testing.NGrouping" - [(JavaObjectArg/int_arg 3)])} - id-bolt - :p 6) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestWordSpout. true))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareCustomStreamGrouping (NGrouping. (Integer. 2)))} + id-bolt + (Integer. 4)) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareCustomJavaObjectGrouping + (JavaObject. "org.apache.storm.testing.NGrouping" + [(JavaObjectArg/int_arg 3)]))} + id-bolt + (Integer. 6)) }) results (complete-topology cluster topology http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj index f75a8e3..7fffd34 100644 --- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj +++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj @@ -15,10 +15,11 @@ ;; limitations under the License. (ns org.apache.storm.messaging.netty-integration-test (:use [clojure test]) - (:import [org.apache.storm.messaging TransportFactory]) + (:import [org.apache.storm.messaging TransportFactory] + [org.apache.storm Thrift]) (:import [org.apache.storm.testing TestWordSpout TestGlobalCount]) - (:use [org.apache.storm testing config]) - (:require [org.apache.storm [thrift :as thrift]])) + (:import [org.apache.storm.utils Utils]) + (:use [org.apache.storm testing util config])) (deftest test-integration (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710 @@ -31,10 +32,13 @@ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1 STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1}] - (let [topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)} - {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.) - :parallelism-hint 6)}) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestWordSpout. true) (Integer. 4))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + (TestGlobalCount.) (Integer. 6))}) results (complete-topology cluster topology ;; important for test that http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/messaging_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/messaging_test.clj b/storm-core/test/clj/org/apache/storm/messaging_test.clj index e987688..402ea7f 100644 --- a/storm-core/test/clj/org/apache/storm/messaging_test.clj +++ b/storm-core/test/clj/org/apache/storm/messaging_test.clj @@ -18,7 +18,8 @@ (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt]) (:use [org.apache.storm testing config]) (:use [org.apache.storm.daemon common]) - (:require [org.apache.storm [thrift :as thrift]])) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils])) (deftest test-local-transport (doseq [transport-on? [false true]] @@ -28,10 +29,13 @@ (if transport-on? true false) STORM-MESSAGING-TRANSPORT "org.apache.storm.messaging.netty.Context"}] - (let [topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 2)} - {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.) - :parallelism-hint 6) + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestWordSpout. true) (Integer. 2))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + (TestGlobalCount.) (Integer. 6)) }) results (complete-topology cluster topology http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/test/clj/org/apache/storm/metrics_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj index 9f051f6..c186288 100644 --- a/storm-core/test/clj/org/apache/storm/metrics_test.clj +++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj @@ -25,10 +25,12 @@ (:import [org.apache.storm.metric.api.rpc CountShellMetric]) (:import [org.apache.storm.utils Utils]) - (:use [org.apache.storm testing clojure config]) + (:use [org.apache.storm testing config]) + (:use [org.apache.storm.internal clojure]) (:use [org.apache.storm.daemon common]) (:use [org.apache.storm.metric testing]) - (:require [org.apache.storm [thrift :as thrift]])) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils])) (defbolt acking-bolt {} {:prepare true} [conf context collector] @@ -105,9 +107,12 @@ "storm.zookeeper.session.timeout" 60000 }] (let [feeder (feeder-spout ["field1"]) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec feeder)} - {"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})] + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareGlobalGrouping)} + count-acks)})] (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) (.feed feeder ["a"] 1) @@ -133,9 +138,12 @@ "storm.zookeeper.session.timeout" 60000 }] (let [feeder (feeder-spout ["field1"]) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec feeder)} - {"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})] + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareAllGrouping)} + count-acks (Integer. 1) {TOPOLOGY-TASKS 2})})] (submit-local-topology (:nimbus cluster) "metrics-tester-with-multitasks" {} topology) (.feed feeder ["a"] 1) @@ -154,10 +162,9 @@ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster)))) (defn mk-shell-bolt-with-metrics-spec - [inputs command & kwargs] - (let [command (into-array String command)] - (apply thrift/mk-bolt-spec inputs - (PythonShellMetricsBolt. command) kwargs))) + [inputs command file] + (Thrift/prepareBoltDetails inputs + (PythonShellMetricsBolt. command file))) (deftest test-custom-metric-with-multilang-py (with-simulated-time-local-cluster @@ -167,9 +174,12 @@ "storm.zookeeper.session.timeout" 60000 }] (let [feeder (feeder-spout ["field1"]) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec feeder)} - {"2" (mk-shell-bolt-with-metrics-spec {"1" :global} ["python" "tester_bolt_metrics.py"])})] + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (mk-shell-bolt-with-metrics-spec + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareGlobalGrouping)} + "python" "tester_bolt_metrics.py")})] (submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology) (.feed feeder ["a"] 1) @@ -189,9 +199,8 @@ ))) (defn mk-shell-spout-with-metrics-spec - [command & kwargs] - (let [command (into-array String command)] - (apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs))) + [command file] + (Thrift/prepareSpoutDetails (PythonShellMetricsSpout. command file))) (deftest test-custom-metric-with-spout-multilang-py (with-simulated-time-local-cluster @@ -199,9 +208,12 @@ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] "storm.zookeeper.connection.timeout" 30000 "storm.zookeeper.session.timeout" 60000}] - (let [topology (thrift/mk-topology - {"1" (mk-shell-spout-with-metrics-spec ["python" "tester_spout_metrics.py"])} - {"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})] + (let [topology (Thrift/buildTopology + {"1" (mk-shell-spout-with-metrics-spec "python" "tester_spout_metrics.py")} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareAllGrouping)} + count-acks)})] (submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology) (advance-cluster-time cluster 7) @@ -216,9 +228,12 @@ TOPOLOGY-STATS-SAMPLE-RATE 1.0 TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}] (let [feeder (feeder-spout ["field1"]) - topology (thrift/mk-topology - {"myspout" (thrift/mk-spout-spec feeder)} - {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} acking-bolt)})] + topology (Thrift/buildTopology + {"myspout" (Thrift/prepareSpoutDetails feeder)} + {"mybolt" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "myspout" nil) + (Thrift/prepareShuffleGrouping)} + acking-bolt)})] (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) (.feed feeder ["a"] 1) @@ -255,9 +270,12 @@ (let [feeder (feeder-spout ["field1"]) tracker (AckFailMapTracker.) _ (.setAckFailDelegate feeder tracker) - topology (thrift/mk-topology - {"myspout" (thrift/mk-spout-spec feeder)} - {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})] + topology (Thrift/buildTopology + {"myspout" (Thrift/prepareSpoutDetails feeder)} + {"mybolt" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "myspout" nil) + (Thrift/prepareShuffleGrouping)} + ack-every-other)})] (submit-local-topology (:nimbus cluster) "metrics-tester" {} @@ -307,9 +325,12 @@ (let [feeder (feeder-spout ["field1"]) tracker (AckFailMapTracker.) _ (.setAckFailDelegate feeder tracker) - topology (thrift/mk-topology - {"myspout" (thrift/mk-spout-spec feeder)} - {"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})] + topology (Thrift/buildTopology + {"myspout" (Thrift/prepareSpoutDetails feeder)} + {"mybolt" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "myspout" nil) + (Thrift/prepareGlobalGrouping)} + ack-every-other)})] (submit-local-topology (:nimbus cluster) "timeout-tester" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} @@ -341,8 +362,8 @@ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}] (let [feeder (feeder-spout ["field1"]) - topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec feeder)} + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} {})] (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
