backport thrift.clj to Thrift.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de9cb106 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de9cb106 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de9cb106 Branch: refs/heads/master Commit: de9cb106f3a68f8b13d16a39b242cecd7e5b0513 Parents: 4699990 Author: Sanket <schintap@untilservice-lm> Authored: Wed Feb 17 10:20:17 2016 -0600 Committer: Sanket <schintap@untilservice-lm> Committed: Wed Feb 17 10:20:17 2016 -0600 ---------------------------------------------------------------------- examples/storm-starter/pom.xml | 10 + .../org/apache/storm/starter/clj/word_count.clj | 3 +- pom.xml | 1 + storm-clojure/pom.xml | 74 ++++ .../src/clj/org/apache/storm/clojure.clj | 207 +++++++++++ .../src/clj/org/apache/storm/thrift.clj | 286 +++++++++++++++ storm-clojure/src/test/clj/clojure_test.clj | 158 +++++++++ storm-core/src/clj/org/apache/storm/clojure.clj | 207 ----------- .../clj/org/apache/storm/command/get_errors.clj | 3 +- .../clj/org/apache/storm/command/monitor.clj | 2 +- .../clj/org/apache/storm/command/rebalance.clj | 3 +- .../org/apache/storm/command/set_log_level.clj | 3 +- .../apache/storm/command/shell_submission.clj | 2 +- .../src/clj/org/apache/storm/daemon/common.clj | 121 ++++--- .../clj/org/apache/storm/daemon/executor.clj | 31 +- .../src/clj/org/apache/storm/daemon/task.clj | 4 +- .../clj/org/apache/storm/internal/clojure.clj | 201 +++++++++++ .../clj/org/apache/storm/internal/thrift.clj | 96 +++++ storm-core/src/clj/org/apache/storm/testing.clj | 29 +- storm-core/src/clj/org/apache/storm/thrift.clj | 286 --------------- storm-core/src/clj/org/apache/storm/ui/core.clj | 2 +- storm-core/src/jvm/org/apache/storm/Thrift.java | 351 +++++++++++++++++++ .../jvm/org/apache/storm/testing/NGrouping.java | 4 +- .../storm/testing/PythonShellMetricsBolt.java | 14 +- .../storm/testing/PythonShellMetricsSpout.java | 8 +- .../src/jvm/org/apache/storm/utils/Utils.java | 8 +- .../org/apache/storm/integration_test.clj | 259 +++++++------- .../org/apache/storm/testing4j_test.clj | 72 ++-- .../test/clj/org/apache/storm/clojure_test.clj | 64 ++-- .../test/clj/org/apache/storm/cluster_test.clj | 3 +- .../test/clj/org/apache/storm/drpc_test.clj | 23 +- .../test/clj/org/apache/storm/grouping_test.clj | 56 +-- .../storm/messaging/netty_integration_test.clj | 18 +- .../clj/org/apache/storm/messaging_test.clj | 14 +- .../test/clj/org/apache/storm/metrics_test.clj | 85 +++-- .../test/clj/org/apache/storm/nimbus_test.clj | 257 +++++++++----- .../scheduler/resource_aware_scheduler_test.clj | 3 +- .../clj/org/apache/storm/supervisor_test.clj | 154 ++++---- .../clj/org/apache/storm/tick_tuple_test.clj | 15 +- .../clj/org/apache/storm/transactional_test.clj | 3 +- 40 files changed, 2128 insertions(+), 1012 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/examples/storm-starter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index 1a7644a..929c8ea 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -83,6 +83,16 @@ <version>3.0.3</version> </dependency> <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-clojure</artifactId> + <version>${project.version}</version> + <!-- + Use "provided" scope to keep storm out of the jar-with-dependencies + For IntelliJ dev, intellij will load properly. + --> + <scope>${provided.scope}</scope> + </dependency> + <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj index fb3a695..c35cc1f 100644 --- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj +++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj @@ -14,7 +14,8 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.starter.clj.word-count - (:import [org.apache.storm StormSubmitter LocalCluster]) + (:import [org.apache.storm StormSubmitter LocalCluster] + [org.apache.storm.utils Utils]) (:use [org.apache.storm clojure config]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 61a1ed9..6d1a93e 100644 --- a/pom.xml +++ b/pom.xml @@ -271,6 +271,7 @@ <module>external/storm-cassandra</module> <module>external/storm-mqtt</module> <module>examples/storm-starter</module> + <module>storm-clojure</module> </modules> <dependencies> http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-clojure/pom.xml ---------------------------------------------------------------------- diff --git a/storm-clojure/pom.xml b/storm-clojure/pom.xml new file mode 100644 index 0000000..7ce4943 --- /dev/null +++ b/storm-clojure/pom.xml @@ -0,0 +1,74 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>storm-clojure</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.googlecode.json-simple</groupId> + <artifactId>json-simple</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>com.theoryinpractise</groupId> + <artifactId>clojure-maven-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <sourceDirectories> + <sourceDirectory>src/clj</sourceDirectory> + </sourceDirectories> + </configuration> + <executions> + <execution> + <id>compile</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-clojure/src/clj/org/apache/storm/clojure.clj ---------------------------------------------------------------------- diff --git a/storm-clojure/src/clj/org/apache/storm/clojure.clj b/storm-clojure/src/clj/org/apache/storm/clojure.clj new file mode 100644 index 0000000..9e1836f --- /dev/null +++ b/storm-clojure/src/clj/org/apache/storm/clojure.clj @@ -0,0 +1,207 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.clojure + (:use [org.apache.storm util]) + (:import [org.apache.storm StormSubmitter]) + (:import [org.apache.storm.generated StreamInfo]) + (:import [org.apache.storm.tuple Tuple]) + (:import [org.apache.storm.task OutputCollector IBolt TopologyContext]) + (:import [org.apache.storm.spout SpoutOutputCollector ISpout]) + (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm.clojure ClojureBolt ClojureSpout]) + (:import [java.util Collection List]) + (:require [org.apache.storm [thrift :as thrift]])) + +(defn direct-stream [fields] + (StreamInfo. fields true)) + +(defn to-spec [avar] + (let [m (meta avar)] + [(str (:ns m)) (str (:name m))])) + +(defn clojure-bolt* [output-spec fn-var conf-fn-var args] + (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec))) + +(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args] + `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args)) + +(defn clojure-spout* [output-spec fn-var conf-var args] + (let [m (meta fn-var)] + (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec)) + )) + +(defmacro clojure-spout [output-spec fn-sym conf-sym args] + `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args)) + +(defn normalize-fns [body] + (for [[name args & impl] body + :let [args (-> "this" + gensym + (cons args) + vec)]] + (concat [name args] impl) + )) + +(defmacro bolt [& body] + (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body) + fns (normalize-fns bolt-fns)] + `(reify IBolt + ~@fns + ~@other-fns))) + +(defmacro bolt-execute [& body] + `(bolt + (~'execute ~@body))) + +(defmacro spout [& body] + (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body) + fns (normalize-fns spout-fns)] + `(reify ISpout + ~@fns + ~@other-fns))) + +(defmacro defbolt [name output-spec & [opts & impl :as all]] + (if-not (map? opts) + `(defbolt ~name ~output-spec {} ~@all) + (let [worker-name (symbol (str name "__")) + conf-fn-name (symbol (str name "__conf__")) + params (:params opts) + conf-code (:conf opts) + fn-body (if (:prepare opts) + (cons 'fn impl) + (let [[args & impl-body] impl + coll-sym (nth args 1) + args (vec (take 1 args)) + prepargs [(gensym "conf") (gensym "context") coll-sym]] + `(fn ~prepargs (bolt (~'execute ~args ~@impl-body))))) + definer (if params + `(defn ~name [& args#] + (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#)) + `(def ~name + (clojure-bolt ~output-spec ~worker-name ~conf-fn-name [])) + ) + ] + `(do + (defn ~conf-fn-name ~(if params params []) + ~conf-code + ) + (defn ~worker-name ~(if params params []) + ~fn-body + ) + ~definer + )))) + +(defmacro defspout [name output-spec & [opts & impl :as all]] + (if-not (map? opts) + `(defspout ~name ~output-spec {} ~@all) + (let [worker-name (symbol (str name "__")) + conf-fn-name (symbol (str name "__conf__")) + params (:params opts) + conf-code (:conf opts) + prepare? (:prepare opts) + prepare? (if (nil? prepare?) true prepare?) + fn-body (if prepare? + (cons 'fn impl) + (let [[args & impl-body] impl + coll-sym (first args) + prepargs [(gensym "conf") (gensym "context") coll-sym]] + `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body))))) + definer (if params + `(defn ~name [& args#] + (clojure-spout ~output-spec ~worker-name ~conf-fn-name args#)) + `(def ~name + (clojure-spout ~output-spec ~worker-name ~conf-fn-name [])) + ) + ] + `(do + (defn ~conf-fn-name ~(if params params []) + ~conf-code + ) + (defn ~worker-name ~(if params params []) + ~fn-body + ) + ~definer + )))) + +(defprotocol TupleValues + (tuple-values [values collector stream])) + +(extend-protocol TupleValues + java.util.Map + (tuple-values [this collector ^String stream] + (let [^TopologyContext context (:context collector) + fields (.. context (getThisOutputFields stream) toList) ] + (vec (map (into + (empty this) (for [[k v] this] + [(if (keyword? k) (name k) k) v])) + fields)))) + java.util.List + (tuple-values [this collector stream] + this)) + +(defn- collectify + [obj] + (if (or (sequential? obj) (instance? Collection obj)) + obj + [obj])) + +(defnk emit-bolt! [collector values + :stream Utils/DEFAULT_STREAM_ID :anchor []] + (let [^List anchor (collectify anchor) + values (tuple-values values collector stream) ] + (.emit ^OutputCollector (:output-collector collector) stream anchor values) + )) + +(defnk emit-direct-bolt! [collector task values + :stream Utils/DEFAULT_STREAM_ID :anchor []] + (let [^List anchor (collectify anchor) + values (tuple-values values collector stream) ] + (.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values) + )) + +(defn ack! [collector ^Tuple tuple] + (.ack ^OutputCollector (:output-collector collector) tuple)) + +(defn fail! [collector ^Tuple tuple] + (.fail ^OutputCollector (:output-collector collector) tuple)) + +(defn report-error! [collector ^Tuple tuple] + (.reportError ^OutputCollector (:output-collector collector) tuple)) + +(defnk emit-spout! [collector values + :stream Utils/DEFAULT_STREAM_ID :id nil] + (let [values (tuple-values values collector stream)] + (.emit ^SpoutOutputCollector (:output-collector collector) stream values id))) + +(defnk emit-direct-spout! [collector task values + :stream Utils/DEFAULT_STREAM_ID :id nil] + (let [values (tuple-values values collector stream)] + (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id))) + +(defalias topology thrift/mk-topology) +(defalias bolt-spec thrift/mk-bolt-spec) +(defalias spout-spec thrift/mk-spout-spec) +(defalias shell-bolt-spec thrift/mk-shell-bolt-spec) +(defalias shell-spout-spec thrift/mk-shell-spout-spec) + +(defn submit-remote-topology [name conf topology] + (StormSubmitter/submitTopology name conf topology)) + +(defn local-cluster [] + ;; do this to avoid a cyclic dependency of + ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster + (eval '(new org.apache.storm.LocalCluster))) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-clojure/src/clj/org/apache/storm/thrift.clj ---------------------------------------------------------------------- diff --git a/storm-clojure/src/clj/org/apache/storm/thrift.clj b/storm-clojure/src/clj/org/apache/storm/thrift.clj new file mode 100644 index 0000000..bf13d23 --- /dev/null +++ b/storm-clojure/src/clj/org/apache/storm/thrift.clj @@ -0,0 +1,286 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.thrift + (:import [java.util HashMap] + [java.io Serializable] + [org.apache.storm.generated NodeInfo Assignment]) + (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology + StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface + ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo + GlobalStreamId ComponentObject ComponentObject$_Fields + ShellComponent SupervisorInfo]) + (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils]) + (:import [org.apache.storm Constants]) + (:import [org.apache.storm.security.auth ReqContext]) + (:import [org.apache.storm.grouping CustomStreamGrouping]) + (:import [org.apache.storm.topology TopologyBuilder]) + (:import [org.apache.storm.clojure RichShellBolt RichShellSpout]) + (:import [org.apache.storm.thrift.transport TTransport] + (org.json.simple JSONValue)) + (:use [org.apache.storm util config log zookeeper])) + +(defn instantiate-java-object + [^JavaObject obj] + (let [name (symbol (.get_full_class_name obj)) + args (map (memfn getFieldValue) (.get_args_list obj))] + (eval `(new ~name ~@args)))) + +(def grouping-constants + {Grouping$_Fields/FIELDS :fields + Grouping$_Fields/SHUFFLE :shuffle + Grouping$_Fields/ALL :all + Grouping$_Fields/NONE :none + Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized + Grouping$_Fields/CUSTOM_OBJECT :custom-object + Grouping$_Fields/DIRECT :direct + Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle}) + +(defn grouping-type + [^Grouping grouping] + (grouping-constants (.getSetField grouping))) + +(defn field-grouping + [^Grouping grouping] + (when-not (= (grouping-type grouping) :fields) + (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping"))) + (.get_fields grouping)) + +(defn global-grouping? + [^Grouping grouping] + (and (= :fields (grouping-type grouping)) + (empty? (field-grouping grouping)))) + +(defn parallelism-hint + [^ComponentCommon component-common] + (let [phint (.get_parallelism_hint component-common)] + (if-not (.is_set_parallelism_hint component-common) 1 phint))) + +(defn nimbus-client-and-conn + ([host port] + (nimbus-client-and-conn host port nil)) + ([host port as-user] + (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user) + (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) + nimbusClient (NimbusClient. conf host port nil as-user) + client (.getClient nimbusClient) + transport (.transport nimbusClient)] + [client transport] ))) + +(defmacro with-nimbus-connection + [[client-sym host port] & body] + `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)] + (try + ~@body + (finally (.close conn#))))) + +(defmacro with-configured-nimbus-connection + [client-sym & body] + `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig)) + context# (ReqContext/context) + user# (if (.principal context#) (.getName (.principal context#))) + nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#) + ~client-sym (.getClient nimbusClient#) + conn# (.transport nimbusClient#) + ] + (try + ~@body + (finally (.close conn#))))) + +(defn direct-output-fields + [fields] + (StreamInfo. fields true)) + +(defn output-fields + [fields] + (StreamInfo. fields false)) + +;TODO: when translating this function, you should replace the map-val with a proper for loop HERE +(defn mk-output-spec + [output-spec] + (let [output-spec (if (map? output-spec) + output-spec + {Utils/DEFAULT_STREAM_ID output-spec})] + (map-val + (fn [out] + (if (instance? StreamInfo out) + out + (StreamInfo. out false))) + output-spec))) + +(defnk mk-plain-component-common + [inputs output-spec parallelism-hint :conf nil] + (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))] + (when parallelism-hint + (.set_parallelism_hint ret parallelism-hint)) + (when conf + (.set_json_conf ret (JSONValue/toJSONString conf))) + ret)) + +(defnk mk-spout-spec* + [spout outputs :p nil :conf nil] + (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout)) + (mk-plain-component-common {} outputs p :conf conf))) + +(defn mk-shuffle-grouping + [] + (Grouping/shuffle (NullStruct.))) + +(defn mk-local-or-shuffle-grouping + [] + (Grouping/local_or_shuffle (NullStruct.))) + +(defn mk-fields-grouping + [fields] + (Grouping/fields fields)) + +(defn mk-global-grouping + [] + (mk-fields-grouping [])) + +(defn mk-direct-grouping + [] + (Grouping/direct (NullStruct.))) + +(defn mk-all-grouping + [] + (Grouping/all (NullStruct.))) + +(defn mk-none-grouping + [] + (Grouping/none (NullStruct.))) + +(defn deserialized-component-object + [^ComponentObject obj] + (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA) + (throw (RuntimeException. "Cannot deserialize non-java-serialized object"))) + (Utils/javaDeserialize (.get_serialized_java obj) Serializable)) + +(defn serialize-component-object + [obj] + (ComponentObject/serialized_java (Utils/javaSerialize obj))) + +(defn- mk-grouping + [grouping-spec] + (cond (nil? grouping-spec) + (mk-none-grouping) + + (instance? Grouping grouping-spec) + grouping-spec + + (instance? CustomStreamGrouping grouping-spec) + (Grouping/custom_serialized (Utils/javaSerialize grouping-spec)) + + (instance? JavaObject grouping-spec) + (Grouping/custom_object grouping-spec) + + (sequential? grouping-spec) + (mk-fields-grouping grouping-spec) + + (= grouping-spec :shuffle) + (mk-shuffle-grouping) + + (= grouping-spec :local-or-shuffle) + (mk-local-or-shuffle-grouping) + (= grouping-spec :none) + (mk-none-grouping) + + (= grouping-spec :all) + (mk-all-grouping) + + (= grouping-spec :global) + (mk-global-grouping) + + (= grouping-spec :direct) + (mk-direct-grouping) + + true + (throw (IllegalArgumentException. + (str grouping-spec " is not a valid grouping"))))) + +(defn- mk-inputs + [inputs] + (into {} (for [[stream-id grouping-spec] inputs] + [(if (sequential? stream-id) + (GlobalStreamId. (first stream-id) (second stream-id)) + (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID)) + (mk-grouping grouping-spec)]))) + +(defnk mk-bolt-spec* + [inputs bolt outputs :p nil :conf nil] + (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)] + (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt)) + common))) + +(defnk mk-spout-spec + [spout :parallelism-hint nil :p nil :conf nil] + (let [parallelism-hint (if p p parallelism-hint)] + {:obj spout :p parallelism-hint :conf conf})) + +(defn- shell-component-params + [command script-or-output-spec kwargs] + (if (string? script-or-output-spec) + [(into-array String [command script-or-output-spec]) + (first kwargs) + (rest kwargs)] + [(into-array String command) + script-or-output-spec + kwargs])) + +(defnk mk-bolt-spec + [inputs bolt :parallelism-hint nil :p nil :conf nil] + (let [parallelism-hint (if p p parallelism-hint)] + {:obj bolt :inputs inputs :p parallelism-hint :conf conf})) + +(defn mk-shell-bolt-spec + [inputs command script-or-output-spec & kwargs] + (let [[command output-spec kwargs] + (shell-component-params command script-or-output-spec kwargs)] + (apply mk-bolt-spec inputs + (RichShellBolt. command (mk-output-spec output-spec)) kwargs))) + +(defn mk-shell-spout-spec + [command script-or-output-spec & kwargs] + (let [[command output-spec kwargs] + (shell-component-params command script-or-output-spec kwargs)] + (apply mk-spout-spec + (RichShellSpout. command (mk-output-spec output-spec)) kwargs))) + +(defn- add-inputs + [declarer inputs] + (doseq [[id grouping] (mk-inputs inputs)] + (.grouping declarer id grouping))) + +(defn mk-topology + ([spout-map bolt-map] + (let [builder (TopologyBuilder.)] + (doseq [[name {spout :obj p :p conf :conf}] spout-map] + (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf))) + (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map] + (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs))) + (.createTopology builder))) + ([spout-map bolt-map state-spout-map] + (mk-topology spout-map bolt-map))) + +;; clojurify-structure is needed or else every element becomes the same after successive calls +;; don't know why this happens +(def STORM-TOPOLOGY-FIELDS + (-> StormTopology/metaDataMap clojurify-structure keys)) + +(def SPOUT-FIELDS + [StormTopology$_Fields/SPOUTS + StormTopology$_Fields/STATE_SPOUTS]) + http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-clojure/src/test/clj/clojure_test.clj ---------------------------------------------------------------------- diff --git a/storm-clojure/src/test/clj/clojure_test.clj b/storm-clojure/src/test/clj/clojure_test.clj new file mode 100644 index 0000000..50d3d29 --- /dev/null +++ b/storm-clojure/src/test/clj/clojure_test.clj @@ -0,0 +1,158 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.clojure-test + (:use [clojure test]) + (:import [org.apache.storm.testing TestWordSpout TestPlannerSpout] + [org.apache.storm.tuple Fields]) + (:use [org.apache.storm testing clojure config]) + (:use [org.apache.storm.daemon common]) + (:require [org.apache.storm [thrift :as thrift]]) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils])) + +(defbolt lalala-bolt1 ["word"] [[val :as tuple] collector] + (let [ret (str val "lalala")] + (emit-bolt! collector [ret] :anchor tuple) + (ack! collector tuple) + )) + +(defbolt lalala-bolt2 ["word"] {:prepare true} + [conf context collector] + (let [state (atom nil)] + (reset! state "lalala") + (bolt + (execute [tuple] + (let [ret (-> (.getValue tuple 0) (str @state))] + (emit-bolt! collector [ret] :anchor tuple) + (ack! collector tuple) + )) + ))) + +(defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]} + [conf context collector] + (let [state (atom nil)] + (bolt + (prepare [_ _ _] + (reset! state (str prefix "lalala"))) + (execute [{val "word" :as tuple}] + (let [ret (-> (.getValue tuple 0) (str @state))] + (emit-bolt! collector [ret] :anchor tuple) + (ack! collector tuple) + ))) + )) + +(deftest test-clojure-bolt + (with-simulated-time-local-cluster [cluster :supervisors 4] + (let [nimbus (:nimbus cluster) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails (TestWordSpout. false))} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + lalala-bolt1) + "3" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareLocalOrShuffleGrouping)} + lalala-bolt2) + "4" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + (lalala-bolt3 "_nathan_"))} + ) + results (complete-topology cluster + topology + :mock-sources {"1" [["david"] + ["adam"] + ]} + )] + (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2"))) + (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3"))) + (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4"))) + ))) + +(defbolt punctuator-bolt ["word" "period" "question" "exclamation"] + [tuple collector] + (if (= (:word tuple) "bar") + (do + (emit-bolt! collector {:word "bar" :period "bar" :question "bar" + "exclamation" "bar"}) + (ack! collector tuple)) + (let [ res (assoc tuple :period (str (:word tuple) ".")) + res (assoc res :exclamation (str (:word tuple) "!")) + res (assoc res :question (str (:word tuple) "?")) ] + (emit-bolt! collector res) + (ack! collector tuple)))) + +(deftest test-map-emit + (with-simulated-time-local-cluster [cluster :supervisors 4] + (let [topology (Thrift/buildTopology + {"words" (Thrift/prepareSpoutDetails (TestWordSpout. false))} + {"out" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "words" nil) + (Thrift/prepareShuffleGrouping)} + punctuator-bolt)}) + results (complete-topology cluster + topology + :mock-sources {"words" [["foo"] ["bar"]]} + )] + (is (ms= [["foo" "foo." "foo?" "foo!"] + ["bar" "bar" "bar" "bar"]] (read-tuples results "out")))))) + +(defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf conf} + [conf context collector] + (bolt + (execute [tuple] + (let [name (.getValue tuple 0) + val (if (= name "!MAX_MSG_TIMEOUT") (.maxTopologyMessageTimeout context) (get conf name))] + (emit-bolt! collector [name val] :anchor tuple) + (ack! collector tuple)) + ))) + +(deftest test-component-specific-config-clojure + (with-simulated-time-local-cluster [cluster] + (let [topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. (Fields. ["conf"])) + nil + {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareShuffleGrouping)} + (conf-query-bolt {"fake.config" 1 + TOPOLOGY-MAX-TASK-PARALLELISM 2 + TOPOLOGY-MAX-SPOUT-PENDING 10}) + nil + {TOPOLOGY-MAX-SPOUT-PENDING 3})}) + results (complete-topology cluster + topology + :topology-name "test123" + :storm-conf {TOPOLOGY-MAX-TASK-PARALLELISM 10 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} + :mock-sources {"1" [["fake.config"] + [TOPOLOGY-MAX-TASK-PARALLELISM] + [TOPOLOGY-MAX-SPOUT-PENDING] + ["!MAX_MSG_TIMEOUT"] + [TOPOLOGY-NAME] + ]})] + (is (= {"fake.config" 1 + TOPOLOGY-MAX-TASK-PARALLELISM 2 + TOPOLOGY-MAX-SPOUT-PENDING 3 + "!MAX_MSG_TIMEOUT" 40 + TOPOLOGY-NAME "test123"} + (->> (read-tuples results "2") + (apply concat) + (apply hash-map)) + ))))) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/clojure.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/clojure.clj b/storm-core/src/clj/org/apache/storm/clojure.clj deleted file mode 100644 index 9e1836f..0000000 --- a/storm-core/src/clj/org/apache/storm/clojure.clj +++ /dev/null @@ -1,207 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.clojure - (:use [org.apache.storm util]) - (:import [org.apache.storm StormSubmitter]) - (:import [org.apache.storm.generated StreamInfo]) - (:import [org.apache.storm.tuple Tuple]) - (:import [org.apache.storm.task OutputCollector IBolt TopologyContext]) - (:import [org.apache.storm.spout SpoutOutputCollector ISpout]) - (:import [org.apache.storm.utils Utils]) - (:import [org.apache.storm.clojure ClojureBolt ClojureSpout]) - (:import [java.util Collection List]) - (:require [org.apache.storm [thrift :as thrift]])) - -(defn direct-stream [fields] - (StreamInfo. fields true)) - -(defn to-spec [avar] - (let [m (meta avar)] - [(str (:ns m)) (str (:name m))])) - -(defn clojure-bolt* [output-spec fn-var conf-fn-var args] - (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec))) - -(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args] - `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args)) - -(defn clojure-spout* [output-spec fn-var conf-var args] - (let [m (meta fn-var)] - (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec)) - )) - -(defmacro clojure-spout [output-spec fn-sym conf-sym args] - `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args)) - -(defn normalize-fns [body] - (for [[name args & impl] body - :let [args (-> "this" - gensym - (cons args) - vec)]] - (concat [name args] impl) - )) - -(defmacro bolt [& body] - (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body) - fns (normalize-fns bolt-fns)] - `(reify IBolt - ~@fns - ~@other-fns))) - -(defmacro bolt-execute [& body] - `(bolt - (~'execute ~@body))) - -(defmacro spout [& body] - (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body) - fns (normalize-fns spout-fns)] - `(reify ISpout - ~@fns - ~@other-fns))) - -(defmacro defbolt [name output-spec & [opts & impl :as all]] - (if-not (map? opts) - `(defbolt ~name ~output-spec {} ~@all) - (let [worker-name (symbol (str name "__")) - conf-fn-name (symbol (str name "__conf__")) - params (:params opts) - conf-code (:conf opts) - fn-body (if (:prepare opts) - (cons 'fn impl) - (let [[args & impl-body] impl - coll-sym (nth args 1) - args (vec (take 1 args)) - prepargs [(gensym "conf") (gensym "context") coll-sym]] - `(fn ~prepargs (bolt (~'execute ~args ~@impl-body))))) - definer (if params - `(defn ~name [& args#] - (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#)) - `(def ~name - (clojure-bolt ~output-spec ~worker-name ~conf-fn-name [])) - ) - ] - `(do - (defn ~conf-fn-name ~(if params params []) - ~conf-code - ) - (defn ~worker-name ~(if params params []) - ~fn-body - ) - ~definer - )))) - -(defmacro defspout [name output-spec & [opts & impl :as all]] - (if-not (map? opts) - `(defspout ~name ~output-spec {} ~@all) - (let [worker-name (symbol (str name "__")) - conf-fn-name (symbol (str name "__conf__")) - params (:params opts) - conf-code (:conf opts) - prepare? (:prepare opts) - prepare? (if (nil? prepare?) true prepare?) - fn-body (if prepare? - (cons 'fn impl) - (let [[args & impl-body] impl - coll-sym (first args) - prepargs [(gensym "conf") (gensym "context") coll-sym]] - `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body))))) - definer (if params - `(defn ~name [& args#] - (clojure-spout ~output-spec ~worker-name ~conf-fn-name args#)) - `(def ~name - (clojure-spout ~output-spec ~worker-name ~conf-fn-name [])) - ) - ] - `(do - (defn ~conf-fn-name ~(if params params []) - ~conf-code - ) - (defn ~worker-name ~(if params params []) - ~fn-body - ) - ~definer - )))) - -(defprotocol TupleValues - (tuple-values [values collector stream])) - -(extend-protocol TupleValues - java.util.Map - (tuple-values [this collector ^String stream] - (let [^TopologyContext context (:context collector) - fields (.. context (getThisOutputFields stream) toList) ] - (vec (map (into - (empty this) (for [[k v] this] - [(if (keyword? k) (name k) k) v])) - fields)))) - java.util.List - (tuple-values [this collector stream] - this)) - -(defn- collectify - [obj] - (if (or (sequential? obj) (instance? Collection obj)) - obj - [obj])) - -(defnk emit-bolt! [collector values - :stream Utils/DEFAULT_STREAM_ID :anchor []] - (let [^List anchor (collectify anchor) - values (tuple-values values collector stream) ] - (.emit ^OutputCollector (:output-collector collector) stream anchor values) - )) - -(defnk emit-direct-bolt! [collector task values - :stream Utils/DEFAULT_STREAM_ID :anchor []] - (let [^List anchor (collectify anchor) - values (tuple-values values collector stream) ] - (.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values) - )) - -(defn ack! [collector ^Tuple tuple] - (.ack ^OutputCollector (:output-collector collector) tuple)) - -(defn fail! [collector ^Tuple tuple] - (.fail ^OutputCollector (:output-collector collector) tuple)) - -(defn report-error! [collector ^Tuple tuple] - (.reportError ^OutputCollector (:output-collector collector) tuple)) - -(defnk emit-spout! [collector values - :stream Utils/DEFAULT_STREAM_ID :id nil] - (let [values (tuple-values values collector stream)] - (.emit ^SpoutOutputCollector (:output-collector collector) stream values id))) - -(defnk emit-direct-spout! [collector task values - :stream Utils/DEFAULT_STREAM_ID :id nil] - (let [values (tuple-values values collector stream)] - (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id))) - -(defalias topology thrift/mk-topology) -(defalias bolt-spec thrift/mk-bolt-spec) -(defalias spout-spec thrift/mk-spout-spec) -(defalias shell-bolt-spec thrift/mk-shell-bolt-spec) -(defalias shell-spout-spec thrift/mk-shell-spout-spec) - -(defn submit-remote-topology [name conf topology] - (StormSubmitter/submitTopology name conf topology)) - -(defn local-cluster [] - ;; do this to avoid a cyclic dependency of - ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster - (eval '(new org.apache.storm.LocalCluster))) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/get_errors.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/get_errors.clj b/storm-core/src/clj/org/apache/storm/command/get_errors.clj index 615a5f3..4f83a86 100644 --- a/storm-core/src/clj/org/apache/storm/command/get_errors.clj +++ b/storm-core/src/clj/org/apache/storm/command/get_errors.clj @@ -15,7 +15,8 @@ ;; limitations under the License. (ns org.apache.storm.command.get-errors (:use [clojure.tools.cli :only [cli]]) - (:use [org.apache.storm thrift log]) + (:use [org.apache.storm log]) + (:use [org.apache.storm.internal thrift]) (:use [org.apache.storm util]) (:require [org.apache.storm.daemon [nimbus :as nimbus] http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/monitor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/monitor.clj b/storm-core/src/clj/org/apache/storm/command/monitor.clj index 7fa9b2a..4ec49af 100644 --- a/storm-core/src/clj/org/apache/storm/command/monitor.clj +++ b/storm-core/src/clj/org/apache/storm/command/monitor.clj @@ -15,7 +15,7 @@ ;; limitations under the License. (ns org.apache.storm.command.monitor (:use [clojure.tools.cli :only [cli]]) - (:use [org.apache.storm.thrift :only [with-configured-nimbus-connection]]) + (:use [org.apache.storm.internal.thrift :only [with-configured-nimbus-connection]]) (:import [org.apache.storm.utils Monitor]) (:gen-class) ) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/rebalance.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/rebalance.clj b/storm-core/src/clj/org/apache/storm/command/rebalance.clj index 3868091..8428d14 100644 --- a/storm-core/src/clj/org/apache/storm/command/rebalance.clj +++ b/storm-core/src/clj/org/apache/storm/command/rebalance.clj @@ -15,7 +15,8 @@ ;; limitations under the License. (ns org.apache.storm.command.rebalance (:use [clojure.tools.cli :only [cli]]) - (:use [org.apache.storm thrift config log]) + (:use [org.apache.storm config log]) + (:use [org.apache.storm.internal thrift]) (:import [org.apache.storm.generated RebalanceOptions]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/set_log_level.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj index 7e1c3c5..6048246 100644 --- a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj +++ b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj @@ -15,7 +15,8 @@ ;; limitations under the License. (ns org.apache.storm.command.set-log-level (:use [clojure.tools.cli :only [cli]]) - (:use [org.apache.storm thrift log]) + (:use [org.apache.storm log]) + (:use [org.apache.storm.internal thrift]) (:import [org.apache.logging.log4j Level]) (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 0253338..0d29376 100644 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@ -17,7 +17,7 @@ (:import [org.apache.storm StormSubmitter] [org.apache.storm.utils Utils] [org.apache.storm.zookeeper Zookeeper]) - (:use [org.apache.storm thrift util config log zookeeper]) + (:use [org.apache.storm util config log zookeeper]) (:require [clojure.string :as str]) (:import [org.apache.storm.utils ConfigUtils]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj index eb1ec1e..db7fd40 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -16,7 +16,7 @@ (ns org.apache.storm.daemon.common (:use [org.apache.storm log config util]) (:import [org.apache.storm.generated StormTopology - InvalidTopologyException GlobalStreamId] + InvalidTopologyException GlobalStreamId Grouping Grouping$_Fields] [org.apache.storm.utils Utils ConfigUtils IPredicate ThriftTopologyUtils] [org.apache.storm.daemon.metrics.reporters PreparableReporter] [com.codahale.metrics MetricRegistry]) @@ -28,9 +28,11 @@ (:import [org.apache.storm.security.auth IAuthorizer]) (:import [java.io InterruptedIOException] [org.json.simple JSONValue]) - (:require [clojure.set :as set]) + (:import [java.util HashMap]) + (:import [org.apache.storm Thrift]) + (:require [clojure.set :as set]) (:require [org.apache.storm.daemon.acker :as acker]) - (:require [org.apache.storm.thrift :as thrift]) + (:require [metrics.reporters.jmx :as jmx]) (:require [metrics.core :refer [default-registry]])) (defn start-metrics-reporter [reporter conf] @@ -91,7 +93,7 @@ (defn topology-bases [storm-cluster-state] (let [active-topologies (.active-storms storm-cluster-state)] - (into {} + (into {} (dofor [id active-topologies] [id (.storm-base storm-cluster-state id nil)] )) @@ -117,12 +119,12 @@ ))))) (defn- validate-ids! [^StormTopology topology] - (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS) + (let [sets (map #(.getFieldValue topology %) (Thrift/getTopologyFields)) offending (apply set/intersection sets)] (if-not (empty? offending) (throw (InvalidTopologyException. (str "Duplicate component ids: " offending)))) - (doseq [f thrift/STORM-TOPOLOGY-FIELDS + (doseq [f (Thrift/getTopologyFields) :let [obj-map (.getFieldValue topology f)]] (if-not (ThriftTopologyUtils/isWorkerHook f) (do @@ -138,7 +140,7 @@ (defn all-components [^StormTopology topology] (apply merge {} - (for [f thrift/STORM-TOPOLOGY-FIELDS] + (for [f (Thrift/getTopologyFields)] (if-not (ThriftTopologyUtils/isWorkerHook f) (.getFieldValue topology f))))) @@ -151,13 +153,13 @@ (defn validate-basic! [^StormTopology topology] (validate-ids! topology) - (doseq [f thrift/SPOUT-FIELDS + (doseq [f (Thrift/getSpoutFields) obj (->> f (.getFieldValue topology) vals)] (if-not (empty? (-> obj .get_common .get_inputs)) (throw (InvalidTopologyException. "May not declare inputs for a spout")))) (doseq [[comp-id comp] (all-components topology) :let [conf (component-conf comp) - p (-> comp .get_common thrift/parallelism-hint)]] + p (-> comp .get_common (Thrift/getParallelismHint))]] (when (and (> (conf TOPOLOGY-TASKS) 0) p (<= p 0)) @@ -178,7 +180,7 @@ (let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)] (if-not (contains? source-streams source-stream-id) (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent stream: [" source-stream-id "] of component [" source-component-id "]"))) - (if (= :fields (thrift/grouping-type grouping)) + (if (= Grouping$_Fields/FIELDS (Thrift/groupingType grouping)) (let [grouping-fields (set (.get_fields grouping)) source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set) diff-fields (set/difference grouping-fields source-stream-fields)] @@ -190,12 +192,15 @@ spout-ids (.. topology get_spouts keySet) spout-inputs (apply merge (for [id spout-ids] - {[id ACKER-INIT-STREAM-ID] ["id"]} + {(Utils/getGlobalStreamId id ACKER-INIT-STREAM-ID) + (Thrift/prepareFieldsGrouping ["id"])} )) bolt-inputs (apply merge (for [id bolt-ids] - {[id ACKER-ACK-STREAM-ID] ["id"] - [id ACKER-FAIL-STREAM-ID] ["id"]} + {(Utils/getGlobalStreamId id ACKER-ACK-STREAM-ID) + (Thrift/prepareFieldsGrouping ["id"]) + (Utils/getGlobalStreamId id ACKER-FAIL-STREAM-ID) + (Thrift/prepareFieldsGrouping ["id"])} ))] (merge spout-inputs bolt-inputs))) @@ -207,29 +212,31 @@ spout-ids (.. topology get_spouts keySet) spout-inputs (apply merge (for [id spout-ids] - {[id EVENTLOGGER-STREAM-ID] ["component-id"]} + {(Utils/getGlobalStreamId id EVENTLOGGER-STREAM-ID) + (Thrift/prepareFieldsGrouping ["component-id"])} )) bolt-inputs (apply merge (for [id bolt-ids] - {[id EVENTLOGGER-STREAM-ID] ["component-id"]} + {(Utils/getGlobalStreamId id EVENTLOGGER-STREAM-ID) + (Thrift/prepareFieldsGrouping ["component-id"])} ))] (merge spout-inputs bolt-inputs))) (defn add-acker! [storm-conf ^StormTopology ret] (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS)) - acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret) - (new org.apache.storm.daemon.acker) - {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"]) - ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"]) - } - :p num-executors - :conf {TOPOLOGY-TASKS num-executors - TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})] + acker-bolt (Thrift/prepareSerializedBoltDetails (acker-inputs ret) + (new org.apache.storm.daemon.acker) + {ACKER-ACK-STREAM-ID (Thrift/directOutputFields ["id"]) + ACKER-FAIL-STREAM-ID (Thrift/directOutputFields ["id"]) + } + (Integer. num-executors) + {TOPOLOGY-TASKS num-executors + TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})] (dofor [[_ bolt] (.get_bolts ret) :let [common (.get_common bolt)]] (do - (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"])) - (.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"])) + (.put_to_streams common ACKER-ACK-STREAM-ID (Thrift/outputFields ["id" "ack-val"])) + (.put_to_streams common ACKER-FAIL-STREAM-ID (Thrift/outputFields ["id"])) )) (dofor [[_ spout] (.get_spouts ret) :let [common (.get_common spout) @@ -239,13 +246,13 @@ (do ;; this set up tick tuples to cause timeouts to be triggered (.set_json_conf common (JSONValue/toJSONString spout-conf)) - (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"])) + (.put_to_streams common ACKER-INIT-STREAM-ID (Thrift/outputFields ["id" "init-val" "spout-task"])) (.put_to_inputs common (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID) - (thrift/mk-direct-grouping)) + (Thrift/prepareDirectGrouping)) (.put_to_inputs common (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID) - (thrift/mk-direct-grouping)) + (Thrift/prepareDirectGrouping)) )) (.put_to_bolts ret "__acker" acker-bolt) )) @@ -254,12 +261,12 @@ (doseq [[_ component] (all-components topology) :let [common (.get_common component)]] (.put_to_streams common METRICS-STREAM-ID - (thrift/output-fields ["task-info" "data-points"])))) + (Thrift/outputFields ["task-info" "data-points"])))) (defn add-system-streams! [^StormTopology topology] (doseq [[_ component] (all-components topology) :let [common (.get_common component)]] - (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"])))) + (.put_to_streams common SYSTEM-STREAM-ID (Thrift/outputFields ["event"])))) (defn map-occurrences [afn coll] @@ -280,7 +287,7 @@ "Generates a list of component ids for each metrics consumer e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] " [storm-conf] - (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER) + (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER) (map #(get % "class")) (number-duplicates) (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) @@ -288,21 +295,22 @@ (defn metrics-consumer-bolt-specs [storm-conf topology] (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology))) inputs (->> (for [comp-id component-ids-that-emit-metrics] - {[comp-id METRICS-STREAM-ID] :shuffle}) + {(Utils/getGlobalStreamId comp-id METRICS-STREAM-ID) + (Thrift/prepareShuffleGrouping)}) (into {})) - mk-bolt-spec (fn [class arg p] - (thrift/mk-bolt-spec* - inputs - (org.apache.storm.metric.MetricsConsumerBolt. class arg) - {} :p p :conf {TOPOLOGY-TASKS p}))] - + (Thrift/prepareSerializedBoltDetails + inputs + (org.apache.storm.metric.MetricsConsumerBolt. class arg) + {} + (Integer. p) + {TOPOLOGY-TASKS p}))] + (map - (fn [component-id register] + (fn [component-id register] [component-id (mk-bolt-spec (get register "class") (get register "argument") (or (get register "parallelism.hint") 1))]) - (metrics-consumer-register-ids storm-conf) (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) @@ -313,32 +321,32 @@ (defn add-eventlogger! [storm-conf ^StormTopology ret] (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) - eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret) - (EventLoggerBolt.) - {} - :p num-executors - :conf {TOPOLOGY-TASKS num-executors + eventlogger-bolt (Thrift/prepareSerializedBoltDetails (eventlogger-inputs ret) + (EventLoggerBolt.) + {} + (Integer. num-executors) + {TOPOLOGY-TASKS num-executors TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})] (doseq [[_ component] (all-components ret) :let [common (.get_common component)]] - (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields (eventlogger-bolt-fields)))) + (.put_to_streams common EVENTLOGGER-STREAM-ID (Thrift/outputFields (eventlogger-bolt-fields)))) (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt) )) -(defn add-metric-components! [storm-conf ^StormTopology topology] +(defn add-metric-components! [storm-conf ^StormTopology topology] (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)] (.put_to_bolts topology comp-id bolt-spec))) (defn add-system-components! [conf ^StormTopology topology] - (let [system-bolt-spec (thrift/mk-bolt-spec* + (let [system-bolt-spec (Thrift/prepareSerializedBoltDetails {} (SystemBolt.) - {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) - METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"]) - CREDENTIALS-CHANGED-STREAM-ID (thrift/output-fields ["creds"])} - :p 0 - :conf {TOPOLOGY-TASKS 0})] + {SYSTEM-TICK-STREAM-ID (Thrift/outputFields ["rate_secs"]) + METRICS-TICK-STREAM-ID (Thrift/outputFields ["interval"]) + CREDENTIALS-CHANGED-STREAM-ID (Thrift/outputFields ["creds"])} + (Integer. 0) + {TOPOLOGY-TASKS 0})] (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec))) (defn system-topology! [storm-conf ^StormTopology topology] @@ -361,7 +369,7 @@ (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS) 0))) (defn num-start-executors [component] - (thrift/parallelism-hint (.get_common component))) + (Thrift/getParallelismHint (.get_common component))) ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE (defn storm-task-info @@ -404,11 +412,10 @@ (defn mk-authorization-handler [klassname conf] (let [aznClass (if klassname (Class/forName klassname)) - aznHandler (if aznClass (.newInstance aznClass))] + aznHandler (if aznClass (.newInstance aznClass))] (if aznHandler (.prepare ^IAuthorizer aznHandler conf)) (log-debug "authorization class name:" klassname " class:" aznClass " handler:" aznHandler) aznHandler - )) - + )) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 3af365b..14a2f6e 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -15,11 +15,11 @@ ;; limitations under the License. (ns org.apache.storm.daemon.executor (:use [org.apache.storm.daemon common]) - (:import [org.apache.storm.generated Grouping] + (:import [org.apache.storm.generated Grouping Grouping$_Fields] [java.io Serializable]) (:use [org.apache.storm util config log timer stats]) (:import [java.util List Random HashMap ArrayList LinkedList Map]) - (:import [org.apache.storm ICredentialsListener]) + (:import [org.apache.storm ICredentialsListener Thrift]) (:import [org.apache.storm.hooks ITaskHook]) (:import [org.apache.storm.tuple AddressedTuple Tuple Fields TupleImpl MessageId]) (:import [org.apache.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector]) @@ -40,8 +40,7 @@ [java.util.concurrent ConcurrentLinkedQueue] [org.json.simple JSONValue] [com.lmax.disruptor.dsl ProducerType]) - (:require [org.apache.storm [thrift :as thrift] - [cluster :as cluster] [stats :as stats]]) + (:require [org.apache.storm [cluster :as cluster] [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) @@ -77,38 +76,38 @@ (let [num-tasks (count target-tasks) random (Random.) target-tasks (vec (sort target-tasks))] - (condp = (thrift/grouping-type thrift-grouping) - :fields - (if (thrift/global-grouping? thrift-grouping) + (condp = (Thrift/groupingType thrift-grouping) + Grouping$_Fields/FIELDS + (if (Thrift/isGlobalGrouping thrift-grouping) (fn [task-id tuple load] ;; It's possible for target to have multiple tasks if it reads multiple sources (first target-tasks)) - (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))] + (let [group-fields (Fields. (Thrift/fieldGrouping thrift-grouping))] (mk-fields-grouper out-fields group-fields target-tasks) )) - :all + Grouping$_Fields/ALL (fn [task-id tuple load] target-tasks) - :shuffle + Grouping$_Fields/SHUFFLE (mk-shuffle-grouper target-tasks topo-conf context component-id stream-id) - :local-or-shuffle + Grouping$_Fields/LOCAL_OR_SHUFFLE (let [same-tasks (set/intersection (set target-tasks) (set (.getThisWorkerTasks context)))] (if-not (empty? same-tasks) (mk-shuffle-grouper (vec same-tasks) topo-conf context component-id stream-id) (mk-shuffle-grouper target-tasks topo-conf context component-id stream-id))) - :none + Grouping$_Fields/NONE (fn [task-id tuple load] (let [i (mod (.nextInt random) num-tasks)] (get target-tasks i) )) - :custom-object - (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))] + Grouping$_Fields/CUSTOM_OBJECT + (let [grouping (Thrift/instantiateJavaObject (.get_custom_object thrift-grouping))] (mk-custom-grouper grouping context component-id stream-id target-tasks)) - :custom-serialized + Grouping$_Fields/CUSTOM_SERIALIZED (let [grouping (Utils/javaDeserialize (.get_custom_serialized thrift-grouping) Serializable)] (mk-custom-grouper grouping context component-id stream-id target-tasks)) - :direct + Grouping$_Fields/DIRECT :direct ))) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index a097e36..77abdec 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -27,8 +27,8 @@ (:import [org.apache.storm.generated ShellComponent JavaObject]) (:import [org.apache.storm.spout ShellSpout]) (:import [java.util Collection List ArrayList]) + (:import [org.apache.storm Thrift]) (:require [org.apache.storm - [thrift :as thrift] [stats :as stats]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])) @@ -83,7 +83,7 @@ (ShellBolt. obj)) obj ) obj (if (instance? JavaObject obj) - (thrift/instantiate-java-object obj) + (Thrift/instantiateJavaObject obj) obj )] obj )) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/internal/clojure.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/internal/clojure.clj b/storm-core/src/clj/org/apache/storm/internal/clojure.clj new file mode 100644 index 0000000..3f29757 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/internal/clojure.clj @@ -0,0 +1,201 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.internal.clojure + (:use [org.apache.storm util]) + (:import [org.apache.storm StormSubmitter]) + (:import [org.apache.storm.generated StreamInfo]) + (:import [org.apache.storm.tuple Tuple]) + (:import [org.apache.storm.task OutputCollector IBolt TopologyContext]) + (:import [org.apache.storm.spout SpoutOutputCollector ISpout]) + (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm.clojure ClojureBolt ClojureSpout]) + (:import [java.util Collection List]) + (:require [org.apache.storm.internal [thrift :as thrift]])) + +(defn direct-stream [fields] + (StreamInfo. fields true)) + +(defn to-spec [avar] + (let [m (meta avar)] + [(str (:ns m)) (str (:name m))])) + +(defn clojure-bolt* [output-spec fn-var conf-fn-var args] + (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec))) + +(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args] + `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args)) + +(defn clojure-spout* [output-spec fn-var conf-var args] + (let [m (meta fn-var)] + (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec)) + )) + +(defmacro clojure-spout [output-spec fn-sym conf-sym args] + `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args)) + +(defn normalize-fns [body] + (for [[name args & impl] body + :let [args (-> "this" + gensym + (cons args) + vec)]] + (concat [name args] impl) + )) + +(defmacro bolt [& body] + (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body) + fns (normalize-fns bolt-fns)] + `(reify IBolt + ~@fns + ~@other-fns))) + +(defmacro bolt-execute [& body] + `(bolt + (~'execute ~@body))) + +(defmacro spout [& body] + (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body) + fns (normalize-fns spout-fns)] + `(reify ISpout + ~@fns + ~@other-fns))) + +(defmacro defbolt [name output-spec & [opts & impl :as all]] + (if-not (map? opts) + `(defbolt ~name ~output-spec {} ~@all) + (let [worker-name (symbol (str name "__")) + conf-fn-name (symbol (str name "__conf__")) + params (:params opts) + conf-code (:conf opts) + fn-body (if (:prepare opts) + (cons 'fn impl) + (let [[args & impl-body] impl + coll-sym (nth args 1) + args (vec (take 1 args)) + prepargs [(gensym "conf") (gensym "context") coll-sym]] + `(fn ~prepargs (bolt (~'execute ~args ~@impl-body))))) + definer (if params + `(defn ~name [& args#] + (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#)) + `(def ~name + (clojure-bolt ~output-spec ~worker-name ~conf-fn-name [])) + ) + ] + `(do + (defn ~conf-fn-name ~(if params params []) + ~conf-code + ) + (defn ~worker-name ~(if params params []) + ~fn-body + ) + ~definer + )))) + +(defmacro defspout [name output-spec & [opts & impl :as all]] + (if-not (map? opts) + `(defspout ~name ~output-spec {} ~@all) + (let [worker-name (symbol (str name "__")) + conf-fn-name (symbol (str name "__conf__")) + params (:params opts) + conf-code (:conf opts) + prepare? (:prepare opts) + prepare? (if (nil? prepare?) true prepare?) + fn-body (if prepare? + (cons 'fn impl) + (let [[args & impl-body] impl + coll-sym (first args) + prepargs [(gensym "conf") (gensym "context") coll-sym]] + `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body))))) + definer (if params + `(defn ~name [& args#] + (clojure-spout ~output-spec ~worker-name ~conf-fn-name args#)) + `(def ~name + (clojure-spout ~output-spec ~worker-name ~conf-fn-name [])) + ) + ] + `(do + (defn ~conf-fn-name ~(if params params []) + ~conf-code + ) + (defn ~worker-name ~(if params params []) + ~fn-body + ) + ~definer + )))) + +(defprotocol TupleValues + (tuple-values [values collector stream])) + +(extend-protocol TupleValues + java.util.Map + (tuple-values [this collector ^String stream] + (let [^TopologyContext context (:context collector) + fields (.. context (getThisOutputFields stream) toList) ] + (vec (map (into + (empty this) (for [[k v] this] + [(if (keyword? k) (name k) k) v])) + fields)))) + java.util.List + (tuple-values [this collector stream] + this)) + +(defn- collectify + [obj] + (if (or (sequential? obj) (instance? Collection obj)) + obj + [obj])) + +(defnk emit-bolt! [collector values + :stream Utils/DEFAULT_STREAM_ID :anchor []] + (let [^List anchor (collectify anchor) + values (tuple-values values collector stream) ] + (.emit ^OutputCollector (:output-collector collector) stream anchor values) + )) + +(defnk emit-direct-bolt! [collector task values + :stream Utils/DEFAULT_STREAM_ID :anchor []] + (let [^List anchor (collectify anchor) + values (tuple-values values collector stream) ] + (.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values) + )) + +(defn ack! [collector ^Tuple tuple] + (.ack ^OutputCollector (:output-collector collector) tuple)) + +(defn fail! [collector ^Tuple tuple] + (.fail ^OutputCollector (:output-collector collector) tuple)) + +(defn report-error! [collector ^Tuple tuple] + (.reportError ^OutputCollector (:output-collector collector) tuple)) + +(defnk emit-spout! [collector values + :stream Utils/DEFAULT_STREAM_ID :id nil] + (let [values (tuple-values values collector stream)] + (.emit ^SpoutOutputCollector (:output-collector collector) stream values id))) + +(defnk emit-direct-spout! [collector task values + :stream Utils/DEFAULT_STREAM_ID :id nil] + (let [values (tuple-values values collector stream)] + (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id))) + +(defn submit-remote-topology [name conf topology] + (StormSubmitter/submitTopology name conf topology)) + +(defn local-cluster [] + ;; do this to avoid a cyclic dependency of + ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster + (eval '(new org.apache.storm.LocalCluster))) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/internal/thrift.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/internal/thrift.clj b/storm-core/src/clj/org/apache/storm/internal/thrift.clj new file mode 100644 index 0000000..4ccf8a7 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/internal/thrift.clj @@ -0,0 +1,96 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns org.apache.storm.internal.thrift + (:import [java.util HashMap] + [java.io Serializable] + [org.apache.storm.generated NodeInfo Assignment]) + (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology + StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface + ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo + GlobalStreamId ComponentObject ComponentObject$_Fields + ShellComponent SupervisorInfo]) + (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils]) + (:import [org.apache.storm Constants]) + (:import [org.apache.storm.security.auth ReqContext]) + (:import [org.apache.storm.grouping CustomStreamGrouping]) + (:import [org.apache.storm.topology TopologyBuilder]) + (:import [org.apache.storm.clojure RichShellBolt RichShellSpout]) + (:import [org.apache.thrift.transport TTransport]) + (:use [org.apache.storm util config log zookeeper])) + +;; Leaving this definition as core.clj is using them as a nested keyword argument +;; Must remove once core.clj is ported to java +(def grouping-constants + {Grouping$_Fields/FIELDS :fields + Grouping$_Fields/SHUFFLE :shuffle + Grouping$_Fields/ALL :all + Grouping$_Fields/NONE :none + Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized + Grouping$_Fields/CUSTOM_OBJECT :custom-object + Grouping$_Fields/DIRECT :direct + Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle}) + +;; Leaving this method as core.clj is using them as a nested keyword argument +;; Must remove once core.clj is ported to java +(defn grouping-type + [^Grouping grouping] + (grouping-constants (.getSetField grouping))) + +(defn nimbus-client-and-conn + ([host port] + (nimbus-client-and-conn host port nil)) + ([host port as-user] + (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user) + (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) + nimbusClient (NimbusClient. conf host port nil as-user) + client (.getClient nimbusClient) + transport (.transport nimbusClient)] + [client transport] ))) + +(defmacro with-nimbus-connection + [[client-sym host port] & body] + `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)] + (try + ~@body + (finally (.close conn#))))) + +(defmacro with-configured-nimbus-connection + [client-sym & body] + `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig)) + context# (ReqContext/context) + user# (if (.principal context#) (.getName (.principal context#))) + nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#) + ~client-sym (.getClient nimbusClient#) + conn# (.transport nimbusClient#) + ] + (try + ~@body + (finally (.close conn#))))) + +;; Leaving this definition as core.clj is using them as a nested keyword argument +;; Must remove once core.clj is ported to java +(defn mk-output-spec + [output-spec] + (let [output-spec (if (map? output-spec) + output-spec + {Utils/DEFAULT_STREAM_ID output-spec})] + (map-val + (fn [out] + (if (instance? StreamInfo out) + out + (StreamInfo. out false))) + output-spec))) http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index c872742..4ad5ff8 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -44,13 +44,15 @@ (:import [org.apache.storm.transactional TransactionalSpoutCoordinator]) (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor]) (:import [org.apache.storm.tuple Tuple]) + (:import [org.apache.storm Thrift]) (:import [org.apache.storm.generated StormTopology]) (:import [org.apache.storm.task TopologyContext] (org.apache.storm.messaging IContext) [org.json.simple JSONValue]) (:require [org.apache.storm [zookeeper :as zk]]) (:require [org.apache.storm.daemon.acker :as acker]) - (:use [org.apache.storm cluster util thrift config log local-state])) + (:use [org.apache.storm cluster util config log local-state]) + (:use [org.apache.storm.internal thrift])) (defn feeder-spout [fields] @@ -526,7 +528,7 @@ (for [[_ spout-spec] spec-map] (-> spout-spec .get_spout_object - deserialized-component-object))) + (Thrift/deserializeComponentObject)))) (defn capture-topology [topology] @@ -543,11 +545,11 @@ (assoc (clojurify-structure bolts) (Utils/uuid) (Bolt. - (serialize-component-object capturer) - (mk-plain-component-common (into {} (for [[id direct?] all-streams] + (Thrift/serializeComponentObject capturer) + (Thrift/prepareComponentCommon (into {} (for [[id direct?] all-streams] [id (if direct? - (mk-direct-grouping) - (mk-global-grouping))])) + (Thrift/prepareDirectGrouping) + (Thrift/prepareGlobalGrouping))])) {} nil)))) {:topology topology @@ -577,7 +579,7 @@ mock-sources)] (doseq [[id spout] replacements] (let [spout-spec (get spouts id)] - (.set_spout_object spout-spec (serialize-component-object spout)))) + (.set_spout_object spout-spec (Thrift/serializeComponentObject spout)))) (doseq [spout (spout-objects spouts)] (when-not (extends? CompletableSpout (.getClass spout)) (throw (RuntimeException. (str "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " spout))))) @@ -636,12 +638,12 @@ (let [track-id (::track-id tracked-cluster) ret (.deepCopy topology)] (dofor [[_ bolt] (.get_bolts ret) - :let [obj (deserialized-component-object (.get_bolt_object bolt))]] - (.set_bolt_object bolt (serialize-component-object + :let [obj (Thrift/deserializeComponentObject (.get_bolt_object bolt))]] + (.set_bolt_object bolt (Thrift/serializeComponentObject (BoltTracker. obj track-id)))) (dofor [[_ spout] (.get_spouts ret) - :let [obj (deserialized-component-object (.get_spout_object spout))]] - (.set_spout_object spout (serialize-component-object + :let [obj (Thrift/deserializeComponentObject (.get_spout_object spout))]] + (.set_spout_object spout (Thrift/serializeComponentObject (SpoutTracker. obj track-id)))) {:topology ret :last-spout-emit (atom 0) @@ -723,8 +725,9 @@ (->> (iterate inc 1) (take (count values)) (map #(str "field" %)))) - spout-spec (mk-spout-spec* (TestWordSpout.) - {stream fields}) + spout-spec (Thrift/prepareSerializedSpoutDetails + (TestWordSpout.) + {stream fields}) topology (StormTopology. {component spout-spec} {} {}) context (TopologyContext. topology
