backport thrift.clj to Thrift.java

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de9cb106
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de9cb106
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de9cb106

Branch: refs/heads/master
Commit: de9cb106f3a68f8b13d16a39b242cecd7e5b0513
Parents: 4699990
Author: Sanket <schintap@untilservice-lm>
Authored: Wed Feb 17 10:20:17 2016 -0600
Committer: Sanket <schintap@untilservice-lm>
Committed: Wed Feb 17 10:20:17 2016 -0600

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |  10 +
 .../org/apache/storm/starter/clj/word_count.clj |   3 +-
 pom.xml                                         |   1 +
 storm-clojure/pom.xml                           |  74 ++++
 .../src/clj/org/apache/storm/clojure.clj        | 207 +++++++++++
 .../src/clj/org/apache/storm/thrift.clj         | 286 +++++++++++++++
 storm-clojure/src/test/clj/clojure_test.clj     | 158 +++++++++
 storm-core/src/clj/org/apache/storm/clojure.clj | 207 -----------
 .../clj/org/apache/storm/command/get_errors.clj |   3 +-
 .../clj/org/apache/storm/command/monitor.clj    |   2 +-
 .../clj/org/apache/storm/command/rebalance.clj  |   3 +-
 .../org/apache/storm/command/set_log_level.clj  |   3 +-
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon/common.clj  | 121 ++++---
 .../clj/org/apache/storm/daemon/executor.clj    |  31 +-
 .../src/clj/org/apache/storm/daemon/task.clj    |   4 +-
 .../clj/org/apache/storm/internal/clojure.clj   | 201 +++++++++++
 .../clj/org/apache/storm/internal/thrift.clj    |  96 +++++
 storm-core/src/clj/org/apache/storm/testing.clj |  29 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  | 286 ---------------
 storm-core/src/clj/org/apache/storm/ui/core.clj |   2 +-
 storm-core/src/jvm/org/apache/storm/Thrift.java | 351 +++++++++++++++++++
 .../jvm/org/apache/storm/testing/NGrouping.java |   4 +-
 .../storm/testing/PythonShellMetricsBolt.java   |  14 +-
 .../storm/testing/PythonShellMetricsSpout.java  |   8 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |   8 +-
 .../org/apache/storm/integration_test.clj       | 259 +++++++-------
 .../org/apache/storm/testing4j_test.clj         |  72 ++--
 .../test/clj/org/apache/storm/clojure_test.clj  |  64 ++--
 .../test/clj/org/apache/storm/cluster_test.clj  |   3 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../test/clj/org/apache/storm/grouping_test.clj |  56 +--
 .../storm/messaging/netty_integration_test.clj  |  18 +-
 .../clj/org/apache/storm/messaging_test.clj     |  14 +-
 .../test/clj/org/apache/storm/metrics_test.clj  |  85 +++--
 .../test/clj/org/apache/storm/nimbus_test.clj   | 257 +++++++++-----
 .../scheduler/resource_aware_scheduler_test.clj |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 154 ++++----
 .../clj/org/apache/storm/tick_tuple_test.clj    |  15 +-
 .../clj/org/apache/storm/transactional_test.clj |   3 +-
 40 files changed, 2128 insertions(+), 1012 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 1a7644a..929c8ea 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -83,6 +83,16 @@
      <version>3.0.3</version>
     </dependency>
     <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>storm-clojure</artifactId>
+        <version>${project.version}</version>
+        <!--
+          Use "provided" scope to keep storm out of the jar-with-dependencies
+          For IntelliJ dev, intellij will load properly.
+        -->
+        <scope>${provided.scope}</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.storm</groupId>
       <artifactId>storm-core</artifactId>
       <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj 
b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
index fb3a695..c35cc1f 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
@@ -14,7 +14,8 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.starter.clj.word-count
-  (:import [org.apache.storm StormSubmitter LocalCluster])
+  (:import [org.apache.storm StormSubmitter LocalCluster]
+           [org.apache.storm.utils Utils])
   (:use [org.apache.storm clojure config])
   (:gen-class))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 61a1ed9..6d1a93e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -271,6 +271,7 @@
         <module>external/storm-cassandra</module>
         <module>external/storm-mqtt</module>
         <module>examples/storm-starter</module>
+        <module>storm-clojure</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-clojure/pom.xml
----------------------------------------------------------------------
diff --git a/storm-clojure/pom.xml b/storm-clojure/pom.xml
new file mode 100644
index 0000000..7ce4943
--- /dev/null
+++ b/storm-clojure/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>storm-clojure</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.theoryinpractise</groupId>
+                <artifactId>clojure-maven-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <sourceDirectories>
+                        <sourceDirectory>src/clj</sourceDirectory>
+                    </sourceDirectories>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <phase>compile</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-clojure/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/clojure.clj 
b/storm-clojure/src/clj/org/apache/storm/clojure.clj
new file mode 100644
index 0000000..9e1836f
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/clojure.clj
@@ -0,0 +1,207 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.clojure
+  (:use [org.apache.storm util])
+  (:import [org.apache.storm StormSubmitter])
+  (:import [org.apache.storm.generated StreamInfo])
+  (:import [org.apache.storm.tuple Tuple])
+  (:import [org.apache.storm.task OutputCollector IBolt TopologyContext])
+  (:import [org.apache.storm.spout SpoutOutputCollector ISpout])
+  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
+  (:import [java.util Collection List])
+  (:require [org.apache.storm [thrift :as thrift]]))
+
+(defn direct-stream [fields]
+  (StreamInfo. fields true))
+
+(defn to-spec [avar]
+  (let [m (meta avar)]
+    [(str (:ns m)) (str (:name m))]))
+
+(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
+  (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args 
(thrift/mk-output-spec output-spec)))
+
+(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
+  `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))
+
+(defn clojure-spout* [output-spec fn-var conf-var args]
+  (let [m (meta fn-var)]
+    (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args 
(thrift/mk-output-spec output-spec))
+    ))
+
+(defmacro clojure-spout [output-spec fn-sym conf-sym args]
+  `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))
+
+(defn normalize-fns [body]
+  (for [[name args & impl] body
+        :let [args (-> "this"
+                       gensym
+                       (cons args)
+                       vec)]]
+    (concat [name args] impl)
+    ))
+
+(defmacro bolt [& body]
+  (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
+        fns (normalize-fns bolt-fns)]
+    `(reify IBolt
+       ~@fns
+       ~@other-fns)))
+
+(defmacro bolt-execute [& body]
+  `(bolt
+     (~'execute ~@body)))
+
+(defmacro spout [& body]
+  (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
+        fns (normalize-fns spout-fns)]
+    `(reify ISpout
+       ~@fns
+       ~@other-fns)))
+
+(defmacro defbolt [name output-spec & [opts & impl :as all]]
+  (if-not (map? opts)
+    `(defbolt ~name ~output-spec {} ~@all)
+    (let [worker-name (symbol (str name "__"))
+          conf-fn-name (symbol (str name "__conf__"))
+          params (:params opts)
+          conf-code (:conf opts)
+          fn-body (if (:prepare opts)
+                    (cons 'fn impl)
+                    (let [[args & impl-body] impl
+                          coll-sym (nth args 1)
+                          args (vec (take 1 args))
+                          prepargs [(gensym "conf") (gensym "context") 
coll-sym]]
+                      `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
+          definer (if params
+                    `(defn ~name [& args#]
+                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name 
args#))
+                    `(def ~name
+                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name 
[]))
+                    )
+          ]
+      `(do
+         (defn ~conf-fn-name ~(if params params [])
+           ~conf-code
+           )
+         (defn ~worker-name ~(if params params [])
+           ~fn-body
+           )
+         ~definer
+         ))))
+
+(defmacro defspout [name output-spec & [opts & impl :as all]]
+  (if-not (map? opts)
+    `(defspout ~name ~output-spec {} ~@all)
+    (let [worker-name (symbol (str name "__"))
+          conf-fn-name (symbol (str name "__conf__"))
+          params (:params opts)
+          conf-code (:conf opts)
+          prepare? (:prepare opts)
+          prepare? (if (nil? prepare?) true prepare?)
+          fn-body (if prepare?
+                    (cons 'fn impl)
+                    (let [[args & impl-body] impl
+                          coll-sym (first args)
+                          prepargs [(gensym "conf") (gensym "context") 
coll-sym]]
+                      `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
+          definer (if params
+                    `(defn ~name [& args#]
+                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name 
args#))
+                    `(def ~name
+                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name 
[]))
+                    )
+          ]
+      `(do
+         (defn ~conf-fn-name ~(if params params [])
+           ~conf-code
+           )
+         (defn ~worker-name ~(if params params [])
+           ~fn-body
+           )
+         ~definer
+         ))))
+
+(defprotocol TupleValues
+  (tuple-values [values collector stream]))
+
+(extend-protocol TupleValues
+  java.util.Map
+  (tuple-values [this collector ^String stream]
+    (let [^TopologyContext context (:context collector)
+          fields (..  context (getThisOutputFields stream) toList) ]
+      (vec (map (into
+                  (empty this) (for [[k v] this]
+                                   [(if (keyword? k) (name k) k) v]))
+                fields))))
+  java.util.List
+  (tuple-values [this collector stream]
+    this))
+
+(defn- collectify
+  [obj]
+  (if (or (sequential? obj) (instance? Collection obj))
+    obj
+    [obj]))
+
+(defnk emit-bolt! [collector values
+                   :stream Utils/DEFAULT_STREAM_ID :anchor []]
+  (let [^List anchor (collectify anchor)
+        values (tuple-values values collector stream) ]
+    (.emit ^OutputCollector (:output-collector collector) stream anchor values)
+    ))
+
+(defnk emit-direct-bolt! [collector task values
+                          :stream Utils/DEFAULT_STREAM_ID :anchor []]
+  (let [^List anchor (collectify anchor)
+        values (tuple-values values collector stream) ]
+    (.emitDirect ^OutputCollector (:output-collector collector) task stream 
anchor values)
+    ))
+
+(defn ack! [collector ^Tuple tuple]
+  (.ack ^OutputCollector (:output-collector collector) tuple))
+
+(defn fail! [collector ^Tuple tuple]
+  (.fail ^OutputCollector (:output-collector collector) tuple))
+
+(defn report-error! [collector ^Tuple tuple]
+  (.reportError ^OutputCollector (:output-collector collector) tuple))
+
+(defnk emit-spout! [collector values
+                    :stream Utils/DEFAULT_STREAM_ID :id nil]
+  (let [values (tuple-values values collector stream)]
+    (.emit ^SpoutOutputCollector (:output-collector collector) stream values 
id)))
+
+(defnk emit-direct-spout! [collector task values
+                           :stream Utils/DEFAULT_STREAM_ID :id nil]
+  (let [values (tuple-values values collector stream)]
+    (.emitDirect ^SpoutOutputCollector (:output-collector collector) task 
stream values id)))
+
+(defalias topology thrift/mk-topology)
+(defalias bolt-spec thrift/mk-bolt-spec)
+(defalias spout-spec thrift/mk-spout-spec)
+(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
+(defalias shell-spout-spec thrift/mk-shell-spout-spec)
+
+(defn submit-remote-topology [name conf topology]
+  (StormSubmitter/submitTopology name conf topology))
+
+(defn local-cluster []
+  ;; do this to avoid a cyclic dependency of
+  ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
+  (eval '(new org.apache.storm.LocalCluster)))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-clojure/src/clj/org/apache/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/thrift.clj 
b/storm-clojure/src/clj/org/apache/storm/thrift.clj
new file mode 100644
index 0000000..bf13d23
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/thrift.clj
@@ -0,0 +1,286 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.thrift
+  (:import [java.util HashMap]
+           [java.io Serializable]
+           [org.apache.storm.generated NodeInfo Assignment])
+  (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology
+            StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
+            ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
+            GlobalStreamId ComponentObject ComponentObject$_Fields
+            ShellComponent SupervisorInfo])
+  (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils])
+  (:import [org.apache.storm Constants])
+  (:import [org.apache.storm.security.auth ReqContext])
+  (:import [org.apache.storm.grouping CustomStreamGrouping])
+  (:import [org.apache.storm.topology TopologyBuilder])
+  (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
+  (:import [org.apache.storm.thrift.transport TTransport]
+           (org.json.simple JSONValue))
+  (:use [org.apache.storm util config log zookeeper]))
+
+(defn instantiate-java-object
+  [^JavaObject obj]
+  (let [name (symbol (.get_full_class_name obj))
+        args (map (memfn getFieldValue) (.get_args_list obj))]
+    (eval `(new ~name ~@args))))
+
+(def grouping-constants
+  {Grouping$_Fields/FIELDS :fields
+   Grouping$_Fields/SHUFFLE :shuffle
+   Grouping$_Fields/ALL :all
+   Grouping$_Fields/NONE :none
+   Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
+   Grouping$_Fields/CUSTOM_OBJECT :custom-object
+   Grouping$_Fields/DIRECT :direct
+   Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
+
+(defn grouping-type
+  [^Grouping grouping]
+  (grouping-constants (.getSetField grouping)))
+
+(defn field-grouping
+  [^Grouping grouping]
+  (when-not (= (grouping-type grouping) :fields)
+    (throw (IllegalArgumentException. "Tried to get grouping fields from non 
fields grouping")))
+  (.get_fields grouping))
+
+(defn global-grouping?
+  [^Grouping grouping]
+  (and (= :fields (grouping-type grouping))
+       (empty? (field-grouping grouping))))
+
+(defn parallelism-hint
+  [^ComponentCommon component-common]
+  (let [phint (.get_parallelism_hint component-common)]
+    (if-not (.is_set_parallelism_hint component-common) 1 phint)))
+
+(defn nimbus-client-and-conn
+  ([host port]
+    (nimbus-client-and-conn host port nil))
+  ([host port as-user]
+  (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
+  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
+        nimbusClient (NimbusClient. conf host port nil as-user)
+        client (.getClient nimbusClient)
+        transport (.transport nimbusClient)]
+        [client transport] )))
+
+(defmacro with-nimbus-connection
+  [[client-sym host port] & body]
+  `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] 
(nimbus-client-and-conn ~host ~port)]
+    (try
+      ~@body
+    (finally (.close conn#)))))
+
+(defmacro with-configured-nimbus-connection
+  [client-sym & body]
+  `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
+         context# (ReqContext/context)
+         user# (if (.principal context#) (.getName (.principal context#)))
+         nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
+         ~client-sym (.getClient nimbusClient#)
+         conn# (.transport nimbusClient#)
+         ]
+     (try
+       ~@body
+     (finally (.close conn#)))))
+
+(defn direct-output-fields
+  [fields]
+  (StreamInfo. fields true))
+
+(defn output-fields
+  [fields]
+  (StreamInfo. fields false))
+
+;TODO: when translating this function, you should replace the map-val with a 
proper for loop HERE
+(defn mk-output-spec
+  [output-spec]
+  (let [output-spec (if (map? output-spec)
+                      output-spec
+                      {Utils/DEFAULT_STREAM_ID output-spec})]
+    (map-val
+      (fn [out]
+        (if (instance? StreamInfo out)
+          out
+          (StreamInfo. out false)))
+      output-spec)))
+
+(defnk mk-plain-component-common
+  [inputs output-spec parallelism-hint :conf nil]
+  (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec 
output-spec)))]
+    (when parallelism-hint
+      (.set_parallelism_hint ret parallelism-hint))
+    (when conf
+      (.set_json_conf ret (JSONValue/toJSONString conf)))
+    ret))
+
+(defnk mk-spout-spec*
+  [spout outputs :p nil :conf nil]
+  (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
+              (mk-plain-component-common {} outputs p :conf conf)))
+
+(defn mk-shuffle-grouping
+  []
+  (Grouping/shuffle (NullStruct.)))
+
+(defn mk-local-or-shuffle-grouping
+  []
+  (Grouping/local_or_shuffle (NullStruct.)))
+
+(defn mk-fields-grouping
+  [fields]
+  (Grouping/fields fields))
+
+(defn mk-global-grouping
+  []
+  (mk-fields-grouping []))
+
+(defn mk-direct-grouping
+  []
+  (Grouping/direct (NullStruct.)))
+
+(defn mk-all-grouping
+  []
+  (Grouping/all (NullStruct.)))
+
+(defn mk-none-grouping
+  []
+  (Grouping/none (NullStruct.)))
+
+(defn deserialized-component-object
+  [^ComponentObject obj]
+  (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
+    (throw (RuntimeException. "Cannot deserialize non-java-serialized 
object")))
+  (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
+
+(defn serialize-component-object
+  [obj]
+  (ComponentObject/serialized_java (Utils/javaSerialize obj)))
+
+(defn- mk-grouping
+  [grouping-spec]
+  (cond (nil? grouping-spec)
+        (mk-none-grouping)
+
+        (instance? Grouping grouping-spec)
+        grouping-spec
+
+        (instance? CustomStreamGrouping grouping-spec)
+        (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
+
+        (instance? JavaObject grouping-spec)
+        (Grouping/custom_object grouping-spec)
+
+        (sequential? grouping-spec)
+        (mk-fields-grouping grouping-spec)
+
+        (= grouping-spec :shuffle)
+        (mk-shuffle-grouping)
+
+        (= grouping-spec :local-or-shuffle)
+        (mk-local-or-shuffle-grouping)
+        (= grouping-spec :none)
+        (mk-none-grouping)
+
+        (= grouping-spec :all)
+        (mk-all-grouping)
+
+        (= grouping-spec :global)
+        (mk-global-grouping)
+
+        (= grouping-spec :direct)
+        (mk-direct-grouping)
+
+        true
+        (throw (IllegalArgumentException.
+                 (str grouping-spec " is not a valid grouping")))))
+
+(defn- mk-inputs
+  [inputs]
+  (into {} (for [[stream-id grouping-spec] inputs]
+             [(if (sequential? stream-id)
+                (GlobalStreamId. (first stream-id) (second stream-id))
+                (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
+              (mk-grouping grouping-spec)])))
+
+(defnk mk-bolt-spec*
+  [inputs bolt outputs :p nil :conf nil]
+  (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf 
conf)]
+    (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
+           common)))
+
+(defnk mk-spout-spec
+  [spout :parallelism-hint nil :p nil :conf nil]
+  (let [parallelism-hint (if p p parallelism-hint)]
+    {:obj spout :p parallelism-hint :conf conf}))
+
+(defn- shell-component-params
+  [command script-or-output-spec kwargs]
+  (if (string? script-or-output-spec)
+    [(into-array String [command script-or-output-spec])
+     (first kwargs)
+     (rest kwargs)]
+    [(into-array String command)
+     script-or-output-spec
+     kwargs]))
+
+(defnk mk-bolt-spec
+  [inputs bolt :parallelism-hint nil :p nil :conf nil]
+  (let [parallelism-hint (if p p parallelism-hint)]
+    {:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
+
+(defn mk-shell-bolt-spec
+  [inputs command script-or-output-spec & kwargs]
+  (let [[command output-spec kwargs]
+        (shell-component-params command script-or-output-spec kwargs)]
+    (apply mk-bolt-spec inputs
+           (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
+
+(defn mk-shell-spout-spec
+  [command script-or-output-spec & kwargs]
+  (let [[command output-spec kwargs]
+        (shell-component-params command script-or-output-spec kwargs)]
+    (apply mk-spout-spec
+           (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
+
+(defn- add-inputs
+  [declarer inputs]
+  (doseq [[id grouping] (mk-inputs inputs)]
+    (.grouping declarer id grouping)))
+
+(defn mk-topology
+  ([spout-map bolt-map]
+   (let [builder (TopologyBuilder.)]
+     (doseq [[name {spout :obj p :p conf :conf}] spout-map]
+       (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) 
(.addConfigurations conf)))
+     (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
+       (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) 
(.addConfigurations conf) (add-inputs inputs)))
+     (.createTopology builder)))
+  ([spout-map bolt-map state-spout-map]
+   (mk-topology spout-map bolt-map)))
+
+;; clojurify-structure is needed or else every element becomes the same after 
successive calls
+;; don't know why this happens
+(def STORM-TOPOLOGY-FIELDS
+  (-> StormTopology/metaDataMap clojurify-structure keys))
+
+(def SPOUT-FIELDS
+  [StormTopology$_Fields/SPOUTS
+   StormTopology$_Fields/STATE_SPOUTS])
+

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-clojure/src/test/clj/clojure_test.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/test/clj/clojure_test.clj 
b/storm-clojure/src/test/clj/clojure_test.clj
new file mode 100644
index 0000000..50d3d29
--- /dev/null
+++ b/storm-clojure/src/test/clj/clojure_test.clj
@@ -0,0 +1,158 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.clojure-test
+  (:use [clojure test])
+  (:import [org.apache.storm.testing TestWordSpout TestPlannerSpout]
+           [org.apache.storm.tuple Fields])
+  (:use [org.apache.storm testing clojure config])
+  (:use [org.apache.storm.daemon common])
+  (:require [org.apache.storm [thrift :as thrift]])
+  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.utils Utils]))
+
+(defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
+  (let [ret (str val "lalala")]
+    (emit-bolt! collector [ret] :anchor tuple)
+    (ack! collector tuple)
+    ))
+
+(defbolt lalala-bolt2 ["word"] {:prepare true}
+  [conf context collector]
+  (let [state (atom nil)]
+    (reset! state "lalala")
+    (bolt
+      (execute [tuple]
+        (let [ret (-> (.getValue tuple 0) (str @state))]
+                (emit-bolt! collector [ret] :anchor tuple)
+                (ack! collector tuple)
+                ))
+      )))
+      
+(defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]}
+  [conf context collector]
+  (let [state (atom nil)]
+    (bolt
+      (prepare [_ _ _]
+        (reset! state (str prefix "lalala")))
+      (execute [{val "word" :as tuple}]
+        (let [ret (-> (.getValue tuple 0) (str @state))]
+          (emit-bolt! collector [ret] :anchor tuple)
+          (ack! collector tuple)
+          )))
+    ))
+
+(deftest test-clojure-bolt
+  (with-simulated-time-local-cluster [cluster :supervisors 4]
+    (let [nimbus (:nimbus cluster)
+          topology (Thrift/buildTopology
+                      {"1" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
+                      {"2" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareShuffleGrouping)}
+                             lalala-bolt1)
+                       "3" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareLocalOrShuffleGrouping)}
+                             lalala-bolt2)
+                       "4" (Thrift/prepareBoltDetails
+                             {(Utils/getGlobalStreamId "1" nil)
+                              (Thrift/prepareShuffleGrouping)}
+                             (lalala-bolt3 "_nathan_"))}
+                     )
+          results (complete-topology cluster
+                                     topology
+                                     :mock-sources {"1" [["david"]
+                                                       ["adam"]
+                                                       ]}
+                                     )]
+      (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
+      (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
+      (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples 
results "4")))
+      )))
+
+(defbolt punctuator-bolt ["word" "period" "question" "exclamation"]
+  [tuple collector]
+  (if (= (:word tuple) "bar")
+    (do 
+      (emit-bolt! collector {:word "bar" :period "bar" :question "bar"
+                            "exclamation" "bar"})
+      (ack! collector tuple))
+    (let [ res (assoc tuple :period (str (:word tuple) "."))
+           res (assoc res :exclamation (str (:word tuple) "!"))
+           res (assoc res :question (str (:word tuple) "?")) ]
+      (emit-bolt! collector res)
+      (ack! collector tuple))))
+
+(deftest test-map-emit
+  (with-simulated-time-local-cluster [cluster :supervisors 4]
+    (let [topology (Thrift/buildTopology
+                      {"words" (Thrift/prepareSpoutDetails (TestWordSpout. 
false))}
+                      {"out" (Thrift/prepareBoltDetails
+                               {(Utils/getGlobalStreamId "words" nil)
+                                (Thrift/prepareShuffleGrouping)}
+                               punctuator-bolt)})
+          results (complete-topology cluster
+                                     topology
+                                     :mock-sources {"words" [["foo"] ["bar"]]}
+                                     )]
+      (is (ms= [["foo" "foo." "foo?" "foo!"]
+                ["bar" "bar" "bar" "bar"]] (read-tuples results "out"))))))
+
+(defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf 
conf}
+  [conf context collector]
+  (bolt
+   (execute [tuple]
+            (let [name (.getValue tuple 0)
+                  val (if (= name "!MAX_MSG_TIMEOUT") 
(.maxTopologyMessageTimeout context) (get conf name))]
+              (emit-bolt! collector [name val] :anchor tuple)
+              (ack! collector tuple))
+            )))
+
+(deftest test-component-specific-config-clojure
+  (with-simulated-time-local-cluster [cluster]
+    (let [topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails
+                            (TestPlannerSpout. (Fields. ["conf"]))
+                            nil
+                            {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})}
+                     {"2" (Thrift/prepareBoltDetails
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareShuffleGrouping)}
+                            (conf-query-bolt {"fake.config" 1
+                                              TOPOLOGY-MAX-TASK-PARALLELISM 2
+                                              TOPOLOGY-MAX-SPOUT-PENDING 10})
+                            nil
+                            {TOPOLOGY-MAX-SPOUT-PENDING 3})})
+          results (complete-topology cluster
+                                     topology
+                                     :topology-name "test123"
+                                     :storm-conf 
{TOPOLOGY-MAX-TASK-PARALLELISM 10
+                                                  
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
+                                     :mock-sources {"1" [["fake.config"]
+                                                         
[TOPOLOGY-MAX-TASK-PARALLELISM]
+                                                         
[TOPOLOGY-MAX-SPOUT-PENDING]
+                                                         ["!MAX_MSG_TIMEOUT"]
+                                                         [TOPOLOGY-NAME]
+                                                         ]})]
+      (is (= {"fake.config" 1
+              TOPOLOGY-MAX-TASK-PARALLELISM 2
+              TOPOLOGY-MAX-SPOUT-PENDING 3
+              "!MAX_MSG_TIMEOUT" 40
+              TOPOLOGY-NAME "test123"}
+             (->> (read-tuples results "2")
+                  (apply concat)
+                  (apply hash-map))
+             )))))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/clojure.clj 
b/storm-core/src/clj/org/apache/storm/clojure.clj
deleted file mode 100644
index 9e1836f..0000000
--- a/storm-core/src/clj/org/apache/storm/clojure.clj
+++ /dev/null
@@ -1,207 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.clojure
-  (:use [org.apache.storm util])
-  (:import [org.apache.storm StormSubmitter])
-  (:import [org.apache.storm.generated StreamInfo])
-  (:import [org.apache.storm.tuple Tuple])
-  (:import [org.apache.storm.task OutputCollector IBolt TopologyContext])
-  (:import [org.apache.storm.spout SpoutOutputCollector ISpout])
-  (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
-  (:import [java.util Collection List])
-  (:require [org.apache.storm [thrift :as thrift]]))
-
-(defn direct-stream [fields]
-  (StreamInfo. fields true))
-
-(defn to-spec [avar]
-  (let [m (meta avar)]
-    [(str (:ns m)) (str (:name m))]))
-
-(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
-  (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args 
(thrift/mk-output-spec output-spec)))
-
-(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
-  `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))
-
-(defn clojure-spout* [output-spec fn-var conf-var args]
-  (let [m (meta fn-var)]
-    (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args 
(thrift/mk-output-spec output-spec))
-    ))
-
-(defmacro clojure-spout [output-spec fn-sym conf-sym args]
-  `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))
-
-(defn normalize-fns [body]
-  (for [[name args & impl] body
-        :let [args (-> "this"
-                       gensym
-                       (cons args)
-                       vec)]]
-    (concat [name args] impl)
-    ))
-
-(defmacro bolt [& body]
-  (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
-        fns (normalize-fns bolt-fns)]
-    `(reify IBolt
-       ~@fns
-       ~@other-fns)))
-
-(defmacro bolt-execute [& body]
-  `(bolt
-     (~'execute ~@body)))
-
-(defmacro spout [& body]
-  (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
-        fns (normalize-fns spout-fns)]
-    `(reify ISpout
-       ~@fns
-       ~@other-fns)))
-
-(defmacro defbolt [name output-spec & [opts & impl :as all]]
-  (if-not (map? opts)
-    `(defbolt ~name ~output-spec {} ~@all)
-    (let [worker-name (symbol (str name "__"))
-          conf-fn-name (symbol (str name "__conf__"))
-          params (:params opts)
-          conf-code (:conf opts)
-          fn-body (if (:prepare opts)
-                    (cons 'fn impl)
-                    (let [[args & impl-body] impl
-                          coll-sym (nth args 1)
-                          args (vec (take 1 args))
-                          prepargs [(gensym "conf") (gensym "context") 
coll-sym]]
-                      `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
-          definer (if params
-                    `(defn ~name [& args#]
-                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name 
args#))
-                    `(def ~name
-                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name 
[]))
-                    )
-          ]
-      `(do
-         (defn ~conf-fn-name ~(if params params [])
-           ~conf-code
-           )
-         (defn ~worker-name ~(if params params [])
-           ~fn-body
-           )
-         ~definer
-         ))))
-
-(defmacro defspout [name output-spec & [opts & impl :as all]]
-  (if-not (map? opts)
-    `(defspout ~name ~output-spec {} ~@all)
-    (let [worker-name (symbol (str name "__"))
-          conf-fn-name (symbol (str name "__conf__"))
-          params (:params opts)
-          conf-code (:conf opts)
-          prepare? (:prepare opts)
-          prepare? (if (nil? prepare?) true prepare?)
-          fn-body (if prepare?
-                    (cons 'fn impl)
-                    (let [[args & impl-body] impl
-                          coll-sym (first args)
-                          prepargs [(gensym "conf") (gensym "context") 
coll-sym]]
-                      `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
-          definer (if params
-                    `(defn ~name [& args#]
-                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name 
args#))
-                    `(def ~name
-                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name 
[]))
-                    )
-          ]
-      `(do
-         (defn ~conf-fn-name ~(if params params [])
-           ~conf-code
-           )
-         (defn ~worker-name ~(if params params [])
-           ~fn-body
-           )
-         ~definer
-         ))))
-
-(defprotocol TupleValues
-  (tuple-values [values collector stream]))
-
-(extend-protocol TupleValues
-  java.util.Map
-  (tuple-values [this collector ^String stream]
-    (let [^TopologyContext context (:context collector)
-          fields (..  context (getThisOutputFields stream) toList) ]
-      (vec (map (into
-                  (empty this) (for [[k v] this]
-                                   [(if (keyword? k) (name k) k) v]))
-                fields))))
-  java.util.List
-  (tuple-values [this collector stream]
-    this))
-
-(defn- collectify
-  [obj]
-  (if (or (sequential? obj) (instance? Collection obj))
-    obj
-    [obj]))
-
-(defnk emit-bolt! [collector values
-                   :stream Utils/DEFAULT_STREAM_ID :anchor []]
-  (let [^List anchor (collectify anchor)
-        values (tuple-values values collector stream) ]
-    (.emit ^OutputCollector (:output-collector collector) stream anchor values)
-    ))
-
-(defnk emit-direct-bolt! [collector task values
-                          :stream Utils/DEFAULT_STREAM_ID :anchor []]
-  (let [^List anchor (collectify anchor)
-        values (tuple-values values collector stream) ]
-    (.emitDirect ^OutputCollector (:output-collector collector) task stream 
anchor values)
-    ))
-
-(defn ack! [collector ^Tuple tuple]
-  (.ack ^OutputCollector (:output-collector collector) tuple))
-
-(defn fail! [collector ^Tuple tuple]
-  (.fail ^OutputCollector (:output-collector collector) tuple))
-
-(defn report-error! [collector ^Tuple tuple]
-  (.reportError ^OutputCollector (:output-collector collector) tuple))
-
-(defnk emit-spout! [collector values
-                    :stream Utils/DEFAULT_STREAM_ID :id nil]
-  (let [values (tuple-values values collector stream)]
-    (.emit ^SpoutOutputCollector (:output-collector collector) stream values 
id)))
-
-(defnk emit-direct-spout! [collector task values
-                           :stream Utils/DEFAULT_STREAM_ID :id nil]
-  (let [values (tuple-values values collector stream)]
-    (.emitDirect ^SpoutOutputCollector (:output-collector collector) task 
stream values id)))
-
-(defalias topology thrift/mk-topology)
-(defalias bolt-spec thrift/mk-bolt-spec)
-(defalias spout-spec thrift/mk-spout-spec)
-(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
-(defalias shell-spout-spec thrift/mk-shell-spout-spec)
-
-(defn submit-remote-topology [name conf topology]
-  (StormSubmitter/submitTopology name conf topology))
-
-(defn local-cluster []
-  ;; do this to avoid a cyclic dependency of
-  ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
-  (eval '(new org.apache.storm.LocalCluster)))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/get_errors.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/get_errors.clj 
b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
index 615a5f3..4f83a86 100644
--- a/storm-core/src/clj/org/apache/storm/command/get_errors.clj
+++ b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
@@ -15,7 +15,8 @@
 ;; limitations under the License.
 (ns org.apache.storm.command.get-errors
   (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm thrift log])
+  (:use [org.apache.storm log])
+  (:use [org.apache.storm.internal thrift])
   (:use [org.apache.storm util])
   (:require [org.apache.storm.daemon
              [nimbus :as nimbus]

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/monitor.clj 
b/storm-core/src/clj/org/apache/storm/command/monitor.clj
index 7fa9b2a..4ec49af 100644
--- a/storm-core/src/clj/org/apache/storm/command/monitor.clj
+++ b/storm-core/src/clj/org/apache/storm/command/monitor.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.command.monitor
   (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm.thrift :only [with-configured-nimbus-connection]])
+  (:use [org.apache.storm.internal.thrift :only 
[with-configured-nimbus-connection]])
   (:import [org.apache.storm.utils Monitor])
   (:gen-class)
  )

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/rebalance.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/rebalance.clj 
b/storm-core/src/clj/org/apache/storm/command/rebalance.clj
index 3868091..8428d14 100644
--- a/storm-core/src/clj/org/apache/storm/command/rebalance.clj
+++ b/storm-core/src/clj/org/apache/storm/command/rebalance.clj
@@ -15,7 +15,8 @@
 ;; limitations under the License.
 (ns org.apache.storm.command.rebalance
   (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm thrift config log])
+  (:use [org.apache.storm config log])
+  (:use [org.apache.storm.internal thrift])
   (:import [org.apache.storm.generated RebalanceOptions])
   (:gen-class))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj 
b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
index 7e1c3c5..6048246 100644
--- a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
+++ b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
@@ -15,7 +15,8 @@
 ;; limitations under the License.
 (ns org.apache.storm.command.set-log-level
   (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm thrift log])
+  (:use [org.apache.storm log])
+  (:use [org.apache.storm.internal thrift])
   (:import [org.apache.logging.log4j Level])
   (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction])
   (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj 
b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 0253338..0d29376 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -17,7 +17,7 @@
   (:import [org.apache.storm StormSubmitter]
            [org.apache.storm.utils Utils]
            [org.apache.storm.zookeeper Zookeeper])
-  (:use [org.apache.storm thrift util config log zookeeper])
+  (:use [org.apache.storm util config log zookeeper])
   (:require [clojure.string :as str])
   (:import [org.apache.storm.utils ConfigUtils])
   (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj 
b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index eb1ec1e..db7fd40 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -16,7 +16,7 @@
 (ns org.apache.storm.daemon.common
   (:use [org.apache.storm log config util])
   (:import [org.apache.storm.generated StormTopology
-            InvalidTopologyException GlobalStreamId]
+            InvalidTopologyException GlobalStreamId Grouping Grouping$_Fields]
            [org.apache.storm.utils Utils ConfigUtils IPredicate 
ThriftTopologyUtils]
            [org.apache.storm.daemon.metrics.reporters PreparableReporter]
            [com.codahale.metrics MetricRegistry])
@@ -28,9 +28,11 @@
   (:import [org.apache.storm.security.auth IAuthorizer])
   (:import [java.io InterruptedIOException]
            [org.json.simple JSONValue])
-  (:require [clojure.set :as set])  
+  (:import [java.util HashMap])
+  (:import [org.apache.storm Thrift])
+  (:require [clojure.set :as set])
   (:require [org.apache.storm.daemon.acker :as acker])
-  (:require [org.apache.storm.thrift :as thrift])
+  (:require [metrics.reporters.jmx :as jmx])
   (:require [metrics.core  :refer [default-registry]]))
 
 (defn start-metrics-reporter [reporter conf]
@@ -91,7 +93,7 @@
 
 (defn topology-bases [storm-cluster-state]
   (let [active-topologies (.active-storms storm-cluster-state)]
-    (into {} 
+    (into {}
           (dofor [id active-topologies]
                  [id (.storm-base storm-cluster-state id nil)]
                  ))
@@ -117,12 +119,12 @@
         )))))
 
 (defn- validate-ids! [^StormTopology topology]
-  (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
+  (let [sets (map #(.getFieldValue topology %) (Thrift/getTopologyFields))
         offending (apply set/intersection sets)]
     (if-not (empty? offending)
       (throw (InvalidTopologyException.
               (str "Duplicate component ids: " offending))))
-    (doseq [f thrift/STORM-TOPOLOGY-FIELDS
+    (doseq [f (Thrift/getTopologyFields)
             :let [obj-map (.getFieldValue topology f)]]
       (if-not (ThriftTopologyUtils/isWorkerHook f)
         (do
@@ -138,7 +140,7 @@
 
 (defn all-components [^StormTopology topology]
   (apply merge {}
-    (for [f thrift/STORM-TOPOLOGY-FIELDS]
+    (for [f (Thrift/getTopologyFields)]
       (if-not (ThriftTopologyUtils/isWorkerHook f)
         (.getFieldValue topology f)))))
 
@@ -151,13 +153,13 @@
 
 (defn validate-basic! [^StormTopology topology]
   (validate-ids! topology)
-  (doseq [f thrift/SPOUT-FIELDS
+  (doseq [f (Thrift/getSpoutFields)
           obj (->> f (.getFieldValue topology) vals)]
     (if-not (empty? (-> obj .get_common .get_inputs))
       (throw (InvalidTopologyException. "May not declare inputs for a 
spout"))))
   (doseq [[comp-id comp] (all-components topology)
           :let [conf (component-conf comp)
-                p (-> comp .get_common thrift/parallelism-hint)]]
+                p (-> comp .get_common (Thrift/getParallelismHint))]]
     (when (and (> (conf TOPOLOGY-TASKS) 0)
                p
                (<= p 0))
@@ -178,7 +180,7 @@
           (let [source-streams (-> all-components (get source-component-id) 
.get_common .get_streams)]
             (if-not (contains? source-streams source-stream-id)
               (throw (InvalidTopologyException. (str "Component: [" id "] 
subscribes from non-existent stream: [" source-stream-id "] of component [" 
source-component-id "]")))
-              (if (= :fields (thrift/grouping-type grouping))
+              (if (= Grouping$_Fields/FIELDS (Thrift/groupingType grouping))
                 (let [grouping-fields (set (.get_fields grouping))
                       source-stream-fields (-> source-streams (get 
source-stream-id) .get_output_fields set)
                       diff-fields (set/difference grouping-fields 
source-stream-fields)]
@@ -190,12 +192,15 @@
         spout-ids (.. topology get_spouts keySet)
         spout-inputs (apply merge
                             (for [id spout-ids]
-                              {[id ACKER-INIT-STREAM-ID] ["id"]}
+                              {(Utils/getGlobalStreamId id 
ACKER-INIT-STREAM-ID)
+                                (Thrift/prepareFieldsGrouping ["id"])}
                               ))
         bolt-inputs (apply merge
                            (for [id bolt-ids]
-                             {[id ACKER-ACK-STREAM-ID] ["id"]
-                              [id ACKER-FAIL-STREAM-ID] ["id"]}
+                             {(Utils/getGlobalStreamId id ACKER-ACK-STREAM-ID)
+                              (Thrift/prepareFieldsGrouping ["id"])
+                              (Utils/getGlobalStreamId id ACKER-FAIL-STREAM-ID)
+                              (Thrift/prepareFieldsGrouping ["id"])}
                              ))]
     (merge spout-inputs bolt-inputs)))
 
@@ -207,29 +212,31 @@
         spout-ids (.. topology get_spouts keySet)
         spout-inputs (apply merge
                        (for [id spout-ids]
-                         {[id EVENTLOGGER-STREAM-ID] ["component-id"]}
+                         {(Utils/getGlobalStreamId id EVENTLOGGER-STREAM-ID)
+                          (Thrift/prepareFieldsGrouping ["component-id"])}
                          ))
         bolt-inputs (apply merge
                       (for [id bolt-ids]
-                        {[id EVENTLOGGER-STREAM-ID] ["component-id"]}
+                        {(Utils/getGlobalStreamId id EVENTLOGGER-STREAM-ID)
+                         (Thrift/prepareFieldsGrouping ["component-id"])}
                         ))]
     (merge spout-inputs bolt-inputs)))
 
 (defn add-acker! [storm-conf ^StormTopology ret]
   (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) 
(storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
-        acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
-                                         (new org.apache.storm.daemon.acker)
-                                         {ACKER-ACK-STREAM-ID 
(thrift/direct-output-fields ["id"])
-                                          ACKER-FAIL-STREAM-ID 
(thrift/direct-output-fields ["id"])
-                                          }
-                                         :p num-executors
-                                         :conf {TOPOLOGY-TASKS num-executors
-                                                TOPOLOGY-TICK-TUPLE-FREQ-SECS 
(storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
+        acker-bolt (Thrift/prepareSerializedBoltDetails (acker-inputs ret)
+                                                        (new 
org.apache.storm.daemon.acker)
+                                                        {ACKER-ACK-STREAM-ID 
(Thrift/directOutputFields ["id"])
+                                                         ACKER-FAIL-STREAM-ID 
(Thrift/directOutputFields ["id"])
+                                                        }
+                                                        (Integer. 
num-executors)
+                                                        {TOPOLOGY-TASKS 
num-executors
+                                                         
TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
     (dofor [[_ bolt] (.get_bolts ret)
             :let [common (.get_common bolt)]]
            (do
-             (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields 
["id" "ack-val"]))
-             (.put_to_streams common ACKER-FAIL-STREAM-ID 
(thrift/output-fields ["id"]))
+             (.put_to_streams common ACKER-ACK-STREAM-ID (Thrift/outputFields 
["id" "ack-val"]))
+             (.put_to_streams common ACKER-FAIL-STREAM-ID (Thrift/outputFields 
["id"]))
              ))
     (dofor [[_ spout] (.get_spouts ret)
             :let [common (.get_common spout)
@@ -239,13 +246,13 @@
       (do
         ;; this set up tick tuples to cause timeouts to be triggered
         (.set_json_conf common (JSONValue/toJSONString spout-conf))
-        (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields 
["id" "init-val" "spout-task"]))
+        (.put_to_streams common ACKER-INIT-STREAM-ID (Thrift/outputFields 
["id" "init-val" "spout-task"]))
         (.put_to_inputs common
                         (GlobalStreamId. ACKER-COMPONENT-ID 
ACKER-ACK-STREAM-ID)
-                        (thrift/mk-direct-grouping))
+                        (Thrift/prepareDirectGrouping))
         (.put_to_inputs common
                         (GlobalStreamId. ACKER-COMPONENT-ID 
ACKER-FAIL-STREAM-ID)
-                        (thrift/mk-direct-grouping))
+                        (Thrift/prepareDirectGrouping))
         ))
     (.put_to_bolts ret "__acker" acker-bolt)
     ))
@@ -254,12 +261,12 @@
   (doseq [[_ component] (all-components topology)
           :let [common (.get_common component)]]
     (.put_to_streams common METRICS-STREAM-ID
-                     (thrift/output-fields ["task-info" "data-points"]))))
+                     (Thrift/outputFields ["task-info" "data-points"]))))
 
 (defn add-system-streams! [^StormTopology topology]
   (doseq [[_ component] (all-components topology)
           :let [common (.get_common component)]]
-    (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields 
["event"]))))
+    (.put_to_streams common SYSTEM-STREAM-ID (Thrift/outputFields ["event"]))))
 
 
 (defn map-occurrences [afn coll]
@@ -280,7 +287,7 @@
   "Generates a list of component ids for each metrics consumer
    e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
   [storm-conf]
-  (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)         
+  (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
        (map #(get % "class"))
        (number-duplicates)
        (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
@@ -288,21 +295,22 @@
 (defn metrics-consumer-bolt-specs [storm-conf topology]
   (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys 
(all-components topology)))
         inputs (->> (for [comp-id component-ids-that-emit-metrics]
-                      {[comp-id METRICS-STREAM-ID] :shuffle})
+                      {(Utils/getGlobalStreamId comp-id METRICS-STREAM-ID)
+                       (Thrift/prepareShuffleGrouping)})
                     (into {}))
-        
         mk-bolt-spec (fn [class arg p]
-                       (thrift/mk-bolt-spec*
-                        inputs
-                        (org.apache.storm.metric.MetricsConsumerBolt. class 
arg)
-                        {} :p p :conf {TOPOLOGY-TASKS p}))]
-    
+                       (Thrift/prepareSerializedBoltDetails
+                         inputs
+                         (org.apache.storm.metric.MetricsConsumerBolt. class 
arg)
+                         {}
+                         (Integer. p)
+                         {TOPOLOGY-TASKS p}))]
+
     (map
-     (fn [component-id register]           
+     (fn [component-id register]
        [component-id (mk-bolt-spec (get register "class")
                                    (get register "argument")
                                    (or (get register "parallelism.hint") 1))])
-     
      (metrics-consumer-register-ids storm-conf)
      (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
 
@@ -313,32 +321,32 @@
 
 (defn add-eventlogger! [storm-conf ^StormTopology ret]
   (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) 
(storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS))
-        eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret)
-                     (EventLoggerBolt.)
-                     {}
-                     :p num-executors
-                     :conf {TOPOLOGY-TASKS num-executors
+        eventlogger-bolt (Thrift/prepareSerializedBoltDetails 
(eventlogger-inputs ret)
+                           (EventLoggerBolt.)
+                           {}
+                           (Integer. num-executors)
+                           {TOPOLOGY-TASKS num-executors
                             TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf 
TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
 
     (doseq [[_ component] (all-components ret)
             :let [common (.get_common component)]]
-      (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields 
(eventlogger-bolt-fields))))
+      (.put_to_streams common EVENTLOGGER-STREAM-ID (Thrift/outputFields 
(eventlogger-bolt-fields))))
     (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
     ))
 
-(defn add-metric-components! [storm-conf ^StormTopology topology]  
+(defn add-metric-components! [storm-conf ^StormTopology topology]
   (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf 
topology)]
     (.put_to_bolts topology comp-id bolt-spec)))
 
 (defn add-system-components! [conf ^StormTopology topology]
-  (let [system-bolt-spec (thrift/mk-bolt-spec*
+  (let [system-bolt-spec (Thrift/prepareSerializedBoltDetails
                           {}
                           (SystemBolt.)
-                          {SYSTEM-TICK-STREAM-ID (thrift/output-fields 
["rate_secs"])
-                           METRICS-TICK-STREAM-ID (thrift/output-fields 
["interval"])
-                           CREDENTIALS-CHANGED-STREAM-ID (thrift/output-fields 
["creds"])}
-                          :p 0
-                          :conf {TOPOLOGY-TASKS 0})]
+                          {SYSTEM-TICK-STREAM-ID (Thrift/outputFields 
["rate_secs"])
+                           METRICS-TICK-STREAM-ID (Thrift/outputFields 
["interval"])
+                           CREDENTIALS-CHANGED-STREAM-ID (Thrift/outputFields 
["creds"])}
+                          (Integer. 0)
+                          {TOPOLOGY-TASKS 0})]
     (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
 
 (defn system-topology! [storm-conf ^StormTopology topology]
@@ -361,7 +369,7 @@
   (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf 
TOPOLOGY-EVENTLOGGER-EXECUTORS) 0)))
 
 (defn num-start-executors [component]
-  (thrift/parallelism-hint (.get_common component)))
+  (Thrift/getParallelismHint (.get_common component)))
 
 ;TODO: when translating this function, you should replace the map-val with a 
proper for loop HERE
 (defn storm-task-info
@@ -404,11 +412,10 @@
 
 (defn mk-authorization-handler [klassname conf]
   (let [aznClass (if klassname (Class/forName klassname))
-        aznHandler (if aznClass (.newInstance aznClass))] 
+        aznHandler (if aznClass (.newInstance aznClass))]
     (if aznHandler (.prepare ^IAuthorizer aznHandler conf))
     (log-debug "authorization class name:" klassname
                  " class:" aznClass
                  " handler:" aznHandler)
     aznHandler
-  )) 
-
+  ))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 3af365b..14a2f6e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -15,11 +15,11 @@
 ;; limitations under the License.
 (ns org.apache.storm.daemon.executor
   (:use [org.apache.storm.daemon common])
-  (:import [org.apache.storm.generated Grouping]
+  (:import [org.apache.storm.generated Grouping Grouping$_Fields]
            [java.io Serializable])
   (:use [org.apache.storm util config log timer stats])
   (:import [java.util List Random HashMap ArrayList LinkedList Map])
-  (:import [org.apache.storm ICredentialsListener])
+  (:import [org.apache.storm ICredentialsListener Thrift])
   (:import [org.apache.storm.hooks ITaskHook])
   (:import [org.apache.storm.tuple AddressedTuple Tuple Fields TupleImpl 
MessageId])
   (:import [org.apache.storm.spout ISpoutWaitStrategy ISpout 
SpoutOutputCollector ISpoutOutputCollector])
@@ -40,8 +40,7 @@
            [java.util.concurrent ConcurrentLinkedQueue]
            [org.json.simple JSONValue]
            [com.lmax.disruptor.dsl ProducerType])
-  (:require [org.apache.storm [thrift :as thrift]
-             [cluster :as cluster] [stats :as stats]])
+  (:require [org.apache.storm [cluster :as cluster] [stats :as stats]])
   (:require [org.apache.storm.daemon [task :as task]])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
   (:require [clojure.set :as set]))
@@ -77,38 +76,38 @@
   (let [num-tasks (count target-tasks)
         random (Random.)
         target-tasks (vec (sort target-tasks))]
-    (condp = (thrift/grouping-type thrift-grouping)
-      :fields
-        (if (thrift/global-grouping? thrift-grouping)
+    (condp = (Thrift/groupingType thrift-grouping)
+      Grouping$_Fields/FIELDS
+        (if (Thrift/isGlobalGrouping thrift-grouping)
           (fn [task-id tuple load]
             ;; It's possible for target to have multiple tasks if it reads 
multiple sources
             (first target-tasks))
-          (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))]
+          (let [group-fields (Fields. (Thrift/fieldGrouping thrift-grouping))]
             (mk-fields-grouper out-fields group-fields target-tasks)
             ))
-      :all
+      Grouping$_Fields/ALL
         (fn [task-id tuple load] target-tasks)
-      :shuffle
+      Grouping$_Fields/SHUFFLE
         (mk-shuffle-grouper target-tasks topo-conf context component-id 
stream-id)
-      :local-or-shuffle
+      Grouping$_Fields/LOCAL_OR_SHUFFLE
         (let [same-tasks (set/intersection
                            (set target-tasks)
                            (set (.getThisWorkerTasks context)))]
           (if-not (empty? same-tasks)
             (mk-shuffle-grouper (vec same-tasks) topo-conf context 
component-id stream-id)
             (mk-shuffle-grouper target-tasks topo-conf context component-id 
stream-id)))
-      :none
+      Grouping$_Fields/NONE
         (fn [task-id tuple load]
           (let [i (mod (.nextInt random) num-tasks)]
             (get target-tasks i)
             ))
-      :custom-object
-        (let [grouping (thrift/instantiate-java-object (.get_custom_object 
thrift-grouping))]
+      Grouping$_Fields/CUSTOM_OBJECT
+        (let [grouping (Thrift/instantiateJavaObject (.get_custom_object 
thrift-grouping))]
           (mk-custom-grouper grouping context component-id stream-id 
target-tasks))
-      :custom-serialized
+      Grouping$_Fields/CUSTOM_SERIALIZED
         (let [grouping (Utils/javaDeserialize (.get_custom_serialized 
thrift-grouping) Serializable)]
           (mk-custom-grouper grouping context component-id stream-id 
target-tasks))
-      :direct
+      Grouping$_Fields/DIRECT
         :direct
       )))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj 
b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index a097e36..77abdec 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -27,8 +27,8 @@
   (:import [org.apache.storm.generated ShellComponent JavaObject])
   (:import [org.apache.storm.spout ShellSpout])
   (:import [java.util Collection List ArrayList])
+  (:import [org.apache.storm Thrift])
   (:require [org.apache.storm
-             [thrift :as thrift]
              [stats :as stats]])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]))
 
@@ -83,7 +83,7 @@
                 (ShellBolt. obj))
               obj )
         obj (if (instance? JavaObject obj)
-              (thrift/instantiate-java-object obj)
+              (Thrift/instantiateJavaObject obj)
               obj )]
     obj
     ))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/internal/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/internal/clojure.clj 
b/storm-core/src/clj/org/apache/storm/internal/clojure.clj
new file mode 100644
index 0000000..3f29757
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/internal/clojure.clj
@@ -0,0 +1,201 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.internal.clojure
+  (:use [org.apache.storm util])
+  (:import [org.apache.storm StormSubmitter])
+  (:import [org.apache.storm.generated StreamInfo])
+  (:import [org.apache.storm.tuple Tuple])
+  (:import [org.apache.storm.task OutputCollector IBolt TopologyContext])
+  (:import [org.apache.storm.spout SpoutOutputCollector ISpout])
+  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
+  (:import [java.util Collection List])
+  (:require [org.apache.storm.internal [thrift :as thrift]]))
+
+(defn direct-stream [fields]
+  (StreamInfo. fields true))
+
+(defn to-spec [avar]
+  (let [m (meta avar)]
+    [(str (:ns m)) (str (:name m))]))
+
+(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
+  (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args 
(thrift/mk-output-spec output-spec)))
+
+(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
+  `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))
+
+(defn clojure-spout* [output-spec fn-var conf-var args]
+  (let [m (meta fn-var)]
+    (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args 
(thrift/mk-output-spec output-spec))
+    ))
+
+(defmacro clojure-spout [output-spec fn-sym conf-sym args]
+  `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))
+
+(defn normalize-fns [body]
+  (for [[name args & impl] body
+        :let [args (-> "this"
+                       gensym
+                       (cons args)
+                       vec)]]
+    (concat [name args] impl)
+    ))
+
+(defmacro bolt [& body]
+  (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
+        fns (normalize-fns bolt-fns)]
+    `(reify IBolt
+       ~@fns
+       ~@other-fns)))
+
+(defmacro bolt-execute [& body]
+  `(bolt
+     (~'execute ~@body)))
+
+(defmacro spout [& body]
+  (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
+        fns (normalize-fns spout-fns)]
+    `(reify ISpout
+       ~@fns
+       ~@other-fns)))
+
+(defmacro defbolt [name output-spec & [opts & impl :as all]]
+  (if-not (map? opts)
+    `(defbolt ~name ~output-spec {} ~@all)
+    (let [worker-name (symbol (str name "__"))
+          conf-fn-name (symbol (str name "__conf__"))
+          params (:params opts)
+          conf-code (:conf opts)
+          fn-body (if (:prepare opts)
+                    (cons 'fn impl)
+                    (let [[args & impl-body] impl
+                          coll-sym (nth args 1)
+                          args (vec (take 1 args))
+                          prepargs [(gensym "conf") (gensym "context") 
coll-sym]]
+                      `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
+          definer (if params
+                    `(defn ~name [& args#]
+                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name 
args#))
+                    `(def ~name
+                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name 
[]))
+                    )
+          ]
+      `(do
+         (defn ~conf-fn-name ~(if params params [])
+           ~conf-code
+           )
+         (defn ~worker-name ~(if params params [])
+           ~fn-body
+           )
+         ~definer
+         ))))
+
+(defmacro defspout [name output-spec & [opts & impl :as all]]
+  (if-not (map? opts)
+    `(defspout ~name ~output-spec {} ~@all)
+    (let [worker-name (symbol (str name "__"))
+          conf-fn-name (symbol (str name "__conf__"))
+          params (:params opts)
+          conf-code (:conf opts)
+          prepare? (:prepare opts)
+          prepare? (if (nil? prepare?) true prepare?)
+          fn-body (if prepare?
+                    (cons 'fn impl)
+                    (let [[args & impl-body] impl
+                          coll-sym (first args)
+                          prepargs [(gensym "conf") (gensym "context") 
coll-sym]]
+                      `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
+          definer (if params
+                    `(defn ~name [& args#]
+                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name 
args#))
+                    `(def ~name
+                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name 
[]))
+                    )
+          ]
+      `(do
+         (defn ~conf-fn-name ~(if params params [])
+           ~conf-code
+           )
+         (defn ~worker-name ~(if params params [])
+           ~fn-body
+           )
+         ~definer
+         ))))
+
+(defprotocol TupleValues
+  (tuple-values [values collector stream]))
+
+(extend-protocol TupleValues
+  java.util.Map
+  (tuple-values [this collector ^String stream]
+    (let [^TopologyContext context (:context collector)
+          fields (..  context (getThisOutputFields stream) toList) ]
+      (vec (map (into
+                  (empty this) (for [[k v] this]
+                                   [(if (keyword? k) (name k) k) v]))
+                fields))))
+  java.util.List
+  (tuple-values [this collector stream]
+    this))
+
+(defn- collectify
+  [obj]
+  (if (or (sequential? obj) (instance? Collection obj))
+    obj
+    [obj]))
+
+(defnk emit-bolt! [collector values
+                   :stream Utils/DEFAULT_STREAM_ID :anchor []]
+  (let [^List anchor (collectify anchor)
+        values (tuple-values values collector stream) ]
+    (.emit ^OutputCollector (:output-collector collector) stream anchor values)
+    ))
+
+(defnk emit-direct-bolt! [collector task values
+                          :stream Utils/DEFAULT_STREAM_ID :anchor []]
+  (let [^List anchor (collectify anchor)
+        values (tuple-values values collector stream) ]
+    (.emitDirect ^OutputCollector (:output-collector collector) task stream 
anchor values)
+    ))
+
+(defn ack! [collector ^Tuple tuple]
+  (.ack ^OutputCollector (:output-collector collector) tuple))
+
+(defn fail! [collector ^Tuple tuple]
+  (.fail ^OutputCollector (:output-collector collector) tuple))
+
+(defn report-error! [collector ^Tuple tuple]
+  (.reportError ^OutputCollector (:output-collector collector) tuple))
+
+(defnk emit-spout! [collector values
+                    :stream Utils/DEFAULT_STREAM_ID :id nil]
+  (let [values (tuple-values values collector stream)]
+    (.emit ^SpoutOutputCollector (:output-collector collector) stream values 
id)))
+
+(defnk emit-direct-spout! [collector task values
+                           :stream Utils/DEFAULT_STREAM_ID :id nil]
+  (let [values (tuple-values values collector stream)]
+    (.emitDirect ^SpoutOutputCollector (:output-collector collector) task 
stream values id)))
+
+(defn submit-remote-topology [name conf topology]
+  (StormSubmitter/submitTopology name conf topology))
+
+(defn local-cluster []
+  ;; do this to avoid a cyclic dependency of
+  ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
+  (eval '(new org.apache.storm.LocalCluster)))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/internal/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/internal/thrift.clj 
b/storm-core/src/clj/org/apache/storm/internal/thrift.clj
new file mode 100644
index 0000000..4ccf8a7
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/internal/thrift.clj
@@ -0,0 +1,96 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.internal.thrift
+  (:import [java.util HashMap]
+           [java.io Serializable]
+           [org.apache.storm.generated NodeInfo Assignment])
+  (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology
+            StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
+            ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
+            GlobalStreamId ComponentObject ComponentObject$_Fields
+            ShellComponent SupervisorInfo])
+  (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils])
+  (:import [org.apache.storm Constants])
+  (:import [org.apache.storm.security.auth ReqContext])
+  (:import [org.apache.storm.grouping CustomStreamGrouping])
+  (:import [org.apache.storm.topology TopologyBuilder])
+  (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
+  (:import [org.apache.thrift.transport TTransport])
+  (:use [org.apache.storm util config log zookeeper]))
+
+;; Leaving this definition as core.clj is using them as a nested keyword 
argument
+;; Must remove once core.clj is ported to java
+(def grouping-constants
+  {Grouping$_Fields/FIELDS :fields
+   Grouping$_Fields/SHUFFLE :shuffle
+   Grouping$_Fields/ALL :all
+   Grouping$_Fields/NONE :none
+   Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
+   Grouping$_Fields/CUSTOM_OBJECT :custom-object
+   Grouping$_Fields/DIRECT :direct
+   Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
+
+;; Leaving this method as core.clj is using them as a nested keyword argument
+;; Must remove once core.clj is ported to java
+(defn grouping-type
+  [^Grouping grouping]
+  (grouping-constants (.getSetField grouping)))
+
+(defn nimbus-client-and-conn
+  ([host port]
+    (nimbus-client-and-conn host port nil))
+  ([host port as-user]
+  (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
+  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
+        nimbusClient (NimbusClient. conf host port nil as-user)
+        client (.getClient nimbusClient)
+        transport (.transport nimbusClient)]
+        [client transport] )))
+
+(defmacro with-nimbus-connection
+  [[client-sym host port] & body]
+  `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] 
(nimbus-client-and-conn ~host ~port)]
+    (try
+      ~@body
+    (finally (.close conn#)))))
+
+(defmacro with-configured-nimbus-connection
+  [client-sym & body]
+  `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
+         context# (ReqContext/context)
+         user# (if (.principal context#) (.getName (.principal context#)))
+         nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
+         ~client-sym (.getClient nimbusClient#)
+         conn# (.transport nimbusClient#)
+         ]
+     (try
+       ~@body
+     (finally (.close conn#)))))
+
+;; Leaving this definition as core.clj is using them as a nested keyword 
argument
+;; Must remove once core.clj is ported to java
+(defn mk-output-spec
+  [output-spec]
+  (let [output-spec (if (map? output-spec)
+                      output-spec
+                      {Utils/DEFAULT_STREAM_ID output-spec})]
+    (map-val
+      (fn [out]
+        (if (instance? StreamInfo out)
+          out
+          (StreamInfo. out false)))
+      output-spec)))

http://git-wip-us.apache.org/repos/asf/storm/blob/de9cb106/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj 
b/storm-core/src/clj/org/apache/storm/testing.clj
index c872742..4ad5ff8 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -44,13 +44,15 @@
   (:import [org.apache.storm.transactional TransactionalSpoutCoordinator])
   (:import [org.apache.storm.transactional.partitioned 
PartitionedTransactionalSpoutExecutor])
   (:import [org.apache.storm.tuple Tuple])
+  (:import [org.apache.storm Thrift])
   (:import [org.apache.storm.generated StormTopology])
   (:import [org.apache.storm.task TopologyContext]
            (org.apache.storm.messaging IContext)
            [org.json.simple JSONValue])
   (:require [org.apache.storm [zookeeper :as zk]])
   (:require [org.apache.storm.daemon.acker :as acker])
-  (:use [org.apache.storm cluster util thrift config log local-state]))
+  (:use [org.apache.storm cluster util config log local-state])
+  (:use [org.apache.storm.internal thrift]))
 
 (defn feeder-spout
   [fields]
@@ -526,7 +528,7 @@
   (for [[_ spout-spec] spec-map]
     (-> spout-spec
         .get_spout_object
-        deserialized-component-object)))
+        (Thrift/deserializeComponentObject))))
 
 (defn capture-topology
   [topology]
@@ -543,11 +545,11 @@
                 (assoc (clojurify-structure bolts)
                   (Utils/uuid)
                   (Bolt.
-                    (serialize-component-object capturer)
-                    (mk-plain-component-common (into {} (for [[id direct?] 
all-streams]
+                    (Thrift/serializeComponentObject capturer)
+                    (Thrift/prepareComponentCommon (into {} (for [[id direct?] 
all-streams]
                                                           [id (if direct?
-                                                                
(mk-direct-grouping)
-                                                                
(mk-global-grouping))]))
+                                                                
(Thrift/prepareDirectGrouping)
+                                                                
(Thrift/prepareGlobalGrouping))]))
                                                {}
                                                nil))))
     {:topology topology
@@ -577,7 +579,7 @@
                               mock-sources)]
     (doseq [[id spout] replacements]
       (let [spout-spec (get spouts id)]
-        (.set_spout_object spout-spec (serialize-component-object spout))))
+        (.set_spout_object spout-spec (Thrift/serializeComponentObject 
spout))))
     (doseq [spout (spout-objects spouts)]
       (when-not (extends? CompletableSpout (.getClass spout))
         (throw (RuntimeException. (str "Cannot complete topology unless every 
spout is a CompletableSpout (or mocked to be); failed by " spout)))))
@@ -636,12 +638,12 @@
    (let [track-id (::track-id tracked-cluster)
          ret (.deepCopy topology)]
      (dofor [[_ bolt] (.get_bolts ret)
-             :let [obj (deserialized-component-object (.get_bolt_object 
bolt))]]
-            (.set_bolt_object bolt (serialize-component-object
+             :let [obj (Thrift/deserializeComponentObject (.get_bolt_object 
bolt))]]
+            (.set_bolt_object bolt (Thrift/serializeComponentObject
                                      (BoltTracker. obj track-id))))
      (dofor [[_ spout] (.get_spouts ret)
-             :let [obj (deserialized-component-object (.get_spout_object 
spout))]]
-            (.set_spout_object spout (serialize-component-object
+             :let [obj (Thrift/deserializeComponentObject (.get_spout_object 
spout))]]
+            (.set_spout_object spout (Thrift/serializeComponentObject
                                        (SpoutTracker. obj track-id))))
      {:topology ret
       :last-spout-emit (atom 0)
@@ -723,8 +725,9 @@
                    (->> (iterate inc 1)
                         (take (count values))
                         (map #(str "field" %))))
-        spout-spec (mk-spout-spec* (TestWordSpout.)
-                                   {stream fields})
+        spout-spec (Thrift/prepareSerializedSpoutDetails
+                     (TestWordSpout.)
+                     {stream fields})
         topology (StormTopology. {component spout-spec} {} {})
         context (TopologyContext.
                   topology

Reply via email to