http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index cfda54e..384f97c 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -17,22 +17,22 @@
  */
 package org.apache.storm.redis.trident;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.trident.state.RedisMapState;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
-import storm.trident.operation.builtin.MapGet;
-import storm.trident.operation.builtin.Sum;
-import storm.trident.state.StateFactory;
-import storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
 
 public class WordCountTridentRedisMap {
     public static StormTopology buildTopology(String redisHost, Integer 
redisPort){

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 
b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
index 4a9599e..2c56c39 100644
--- 
a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
+++ 
b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
@@ -18,11 +18,11 @@
 
 package org.apache.storm.solr.bolt;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java
 
b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java
index 9fa38cd..d078959 100644
--- 
a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java
+++ 
b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java
@@ -20,7 +20,7 @@ package org.apache.storm.solr.mapper;
 
 import static 
org.apache.storm.solr.schema.SolrFieldTypeFinder.FieldTypeWrapper;
 
-import backtype.storm.tuple.ITuple;
+import org.apache.storm.tuple.ITuple;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
 
b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
index f9d6e9b..704ec2d 100644
--- 
a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
+++ 
b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
@@ -18,7 +18,7 @@
 
 package org.apache.storm.solr.mapper;
 
-import backtype.storm.tuple.ITuple;
+import org.apache.storm.tuple.ITuple;
 import com.google.gson.Gson;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java
 
b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java
index 96242d1..5c0223b 100644
--- 
a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java
+++ 
b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java
@@ -18,8 +18,8 @@
 
 package org.apache.storm.solr.mapper;
 
-import backtype.storm.tuple.ITuple;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
 import org.apache.solr.client.solrj.SolrRequest;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
 
b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
index 8187d11..d84d140 100644
--- 
a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
+++ 
b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
@@ -18,7 +18,7 @@
 
 package org.apache.storm.solr.trident;
 
-import backtype.storm.topology.FailedException;
+import org.apache.storm.topology.FailedException;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -26,8 +26,8 @@ import org.apache.storm.solr.config.SolrConfig;
 import org.apache.storm.solr.mapper.SolrMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import storm.trident.state.State;
-import storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
 
b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
index a1f815d..7b092ba 100644
--- 
a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
+++ 
b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
@@ -18,11 +18,11 @@
 
 package org.apache.storm.solr.trident;
 
-import backtype.storm.task.IMetricsContext;
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.solr.config.SolrConfig;
 import org.apache.storm.solr.mapper.SolrMapper;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java
 
b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java
index db7b995..53698fa 100644
--- 
a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java
+++ 
b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java
@@ -18,9 +18,9 @@
 
 package org.apache.storm.solr.trident;
 
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java
 
b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java
index 7d2357c..8e3390d 100644
--- 
a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java
+++ 
b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java
@@ -18,12 +18,12 @@
 
 package org.apache.storm.solr.spout;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.Lists;
 import org.apache.storm.solr.util.TestUtil;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java
 
b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java
index 6afed2c..bb0c83c 100644
--- 
a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java
+++ 
b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java
@@ -18,12 +18,12 @@
 
 package org.apache.storm.solr.spout;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import org.apache.storm.solr.util.TestUtil;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
 
b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
index 809f434..5c9f16d 100644
--- 
a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
+++ 
b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
@@ -18,8 +18,8 @@
 
 package org.apache.storm.solr.topology;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.solr.bolt.SolrUpdateBolt;
 import org.apache.storm.solr.config.CountBasedCommit;
 import org.apache.storm.solr.config.SolrCommitStrategy;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java
 
b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java
index 02e6d6f..24e6b5e 100644
--- 
a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java
+++ 
b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java
@@ -18,8 +18,8 @@
 
 package org.apache.storm.solr.topology;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.solr.bolt.SolrUpdateBolt;
 import org.apache.storm.solr.mapper.SolrJsonMapper;
 import org.apache.storm.solr.mapper.SolrMapper;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java
 
b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java
index 607cf98..e0f4dc6 100644
--- 
a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java
+++ 
b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java
@@ -18,10 +18,10 @@
 
 package org.apache.storm.solr.topology;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.storm.solr.config.SolrCommitStrategy;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
 
b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
index 4884c82..d022c8a 100644
--- 
a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
+++ 
b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
@@ -18,13 +18,13 @@
 
 package org.apache.storm.solr.trident;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
 import org.apache.storm.solr.spout.SolrFieldsSpout;
 import org.apache.storm.solr.topology.SolrFieldsTopology;
-import storm.trident.Stream;
-import storm.trident.TridentTopology;
-import storm.trident.state.StateFactory;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
 
b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
index d03b1dd..75131b8 100644
--- 
a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
+++ 
b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
@@ -18,13 +18,13 @@
 
 package org.apache.storm.solr.trident;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
 import org.apache.storm.solr.spout.SolrJsonSpout;
 import org.apache.storm.solr.topology.SolrJsonTopology;
-import storm.trident.Stream;
-import storm.trident.TridentTopology;
-import storm.trident.state.StateFactory;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/log4j2/cluster.xml
----------------------------------------------------------------------
diff --git a/log4j2/cluster.xml b/log4j2/cluster.xml
index f349d8c..ca333b2 100644
--- a/log4j2/cluster.xml
+++ b/log4j2/cluster.xml
@@ -73,15 +73,15 @@
 </appenders>
 <loggers>
 
-    <Logger name="backtype.storm.logging.filters.AccessLoggingFilter" 
level="info" additivity="false">
+    <Logger name="org.apache.storm.logging.filters.AccessLoggingFilter" 
level="info" additivity="false">
         <AppenderRef ref="WEB-ACCESS"/>
         <AppenderRef ref="syslog"/>
     </Logger>
-    <Logger name="backtype.storm.logging.ThriftAccessLogger" level="info" 
additivity="false">
+    <Logger name="org.apache.storm.logging.ThriftAccessLogger" level="info" 
additivity="false">
         <AppenderRef ref="THRIFT-ACCESS"/>
         <AppenderRef ref="syslog"/>
     </Logger>
-    <Logger name="backtype.storm.metric.LoggingMetricsConsumer" level="info">
+    <Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info">
         <AppenderRef ref="METRICS"/>
     </Logger>
     <root level="info"> <!-- We log everything -->

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 181c2a9..bced62b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -241,7 +241,7 @@
         <maven-surefire.version>2.18.1</maven-surefire.version>
 
         <!-- Java and clojure build lifecycle test properties are defined here 
to avoid having to create a default profile -->
-        
<java.unit.test.exclude>backtype.storm.testing.IntegrationTest</java.unit.test.exclude>
+        
<java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
         <java.unit.test.include>**/Test*.java, **/*Test.java, 
**/*TestCase.java</java.unit.test.include>    <!--maven surefire plugin default 
test list-->
         <!-- by default the clojure test set are all clojure tests that are 
not integration tests. This property is overridden in the profiles -->
         <clojure.test.set>!integration.*</clojure.test.set>
@@ -364,7 +364,7 @@
             <id>all-tests</id>
             <properties>
                 
<java.integration.test.include>**/*.java</java.integration.test.include>
-                
<java.integration.test.group>backtype.storm.testing.IntegrationTest</java.integration.test.group>
+                
<java.integration.test.group>org.apache.storm.testing.IntegrationTest</java.integration.test.group>
                 <clojure.test.set>*.*</clojure.test.set>
             </properties>
         </profile>
@@ -374,7 +374,7 @@
                 <!--Java-->
                 <java.unit.test.include>no.unit.tests</java.unit.test.include>
                 
<java.integration.test.include>**/*.java</java.integration.test.include>
-                
<java.integration.test.group>backtype.storm.testing.IntegrationTest</java.integration.test.group>
+                
<java.integration.test.group>org.apache.storm.testing.IntegrationTest</java.integration.test.group>
                 <!--Clojure-->
                 <clojure.test.set>integration.*</clojure.test.set>
                 
<clojure.test.declared.namespace.only>true</clojure.test.declared.namespace.only>

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 765e798..08fffa4 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -1018,9 +1018,9 @@
                                 <configuration>
                                     <append>true</append>
                                     <excludes>
-                                        
<exclude>backtype/storm/metric/api/IMetricsConsumer$DataPointFieldAccess</exclude>
-                                        
<exclude>backtype/storm/metric/api/IMetricsConsumer$TaskInfoFieldAccess</exclude>
-                                        
<exclude>backtype/storm/testing/TestSerObjectFieldAccess</exclude>
+                                        
<exclude>org/apache/storm/metric/api/IMetricsConsumer$DataPointFieldAccess</exclude>
+                                        
<exclude>org/apache/storm/metric/api/IMetricsConsumer$TaskInfoFieldAccess</exclude>
+                                        
<exclude>org/apache/storm/testing/TestSerObjectFieldAccess</exclude>
                                     </excludes>
                                 </configuration>
                             </execution>
@@ -1032,7 +1032,7 @@
                                 </goals>
                                 <configuration>
                                     <excludes>
-                                        
<exclude>backtype/storm/generated/*</exclude> <!--Thrift generated code-->
+                                        
<exclude>org/apache/storm/generated/*</exclude> <!--Thrift generated code-->
                                     </excludes>
                                     <includes>
                                         <include>backtype/*/*/*/*</include>

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/LocalCluster.clj 
b/storm-core/src/clj/backtype/storm/LocalCluster.clj
deleted file mode 100644
index aa37c89..0000000
--- a/storm-core/src/clj/backtype/storm/LocalCluster.clj
+++ /dev/null
@@ -1,106 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.LocalCluster
-  (:use [backtype.storm testing config util])
-  (:import [backtype.storm.utils Utils])
-  (:import [java.util Map])
-  (:gen-class
-    :init init
-    :implements [backtype.storm.ILocalCluster]
-    :constructors {[] []
-                   [java.util.Map] []
-                   [String Long] []}
-    :state state))
-
-(defn -init
-  ([]
-   (let [ret (mk-local-storm-cluster
-               :daemon-conf
-               {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
-     [[] ret]))
-  ([^String zk-host ^Long zk-port]
-   (let [ret (mk-local-storm-cluster :daemon-conf 
{TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
-                                                     STORM-ZOOKEEPER-SERVERS 
(list zk-host)
-                                                     STORM-ZOOKEEPER-PORT 
zk-port})]
-     [[] ret]))
-  ([^Map stateMap]
-     [[] stateMap]))
-
-(defn submit-hook [hook name conf topology]
-  (let [topologyInfo (Utils/getTopologyInfo name nil conf)]
-    (.notify hook topologyInfo conf topology)))
-
-(defn -submitTopology
-  [this name conf topology]
-  (submit-local-topology
-    (:nimbus (. this state)) name conf topology)
-  (let [hook (get-configured-class conf 
STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
-    (when hook (submit-hook hook name conf topology))))
-
-
-(defn -submitTopologyWithOpts
-  [this name conf topology submit-opts]
-  (submit-local-topology-with-opts
-    (:nimbus (. this state)) name conf topology submit-opts))
-
-(defn -uploadNewCredentials
-  [this name creds]
-  (.uploadNewCredentials (:nimbus (. this state)) name creds))
-
-(defn -shutdown
-  [this]
-  (kill-local-storm-cluster (. this state)))
-
-(defn -killTopology
-  [this name]
-  (.killTopology (:nimbus (. this state)) name))
-
-(defn -getTopologyConf
-  [this id]
-  (.getTopologyConf (:nimbus (. this state)) id))
-
-(defn -getTopology
-  [this id]
-  (.getTopology (:nimbus (. this state)) id))
-
-(defn -getClusterInfo
-  [this]
-  (.getClusterInfo (:nimbus (. this state))))
-
-(defn -getTopologyInfo
-  [this id]
-  (.getTopologyInfo (:nimbus (. this state)) id))
-
-(defn -killTopologyWithOpts
-  [this name opts]
-  (.killTopologyWithOpts (:nimbus (. this state)) name opts))
-
-(defn -activate
-  [this name]
-  (.activate (:nimbus (. this state)) name))
-
-(defn -deactivate
-  [this name]
-  (.deactivate (:nimbus (. this state)) name))
-
-(defn -rebalance
-  [this name opts]
-  (.rebalance (:nimbus (. this state)) name opts))
-
-(defn -getState
-  [this]
-  (.state this))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/LocalDRPC.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/LocalDRPC.clj 
b/storm-core/src/clj/backtype/storm/LocalDRPC.clj
deleted file mode 100644
index 9773821..0000000
--- a/storm-core/src/clj/backtype/storm/LocalDRPC.clj
+++ /dev/null
@@ -1,56 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.LocalDRPC
-  (:require [backtype.storm.daemon [drpc :as drpc]])
-  (:use [backtype.storm config util])
-  (:import [backtype.storm.utils InprocMessaging ServiceRegistry])
-  (:gen-class
-   :init init
-   :implements [backtype.storm.ILocalDRPC]
-   :constructors {[] []}
-   :state state ))
-
-(defn -init []
-  (let [handler (drpc/service-handler (read-storm-config))
-        id (ServiceRegistry/registerService handler)
-        ]
-    [[] {:service-id id :handler handler}]
-    ))
-
-(defn -execute [this func funcArgs]
-  (.execute (:handler (. this state)) func funcArgs)
-  )
-
-(defn -result [this id result]
-  (.result (:handler (. this state)) id result)
-  )
-
-(defn -fetchRequest [this func]
-  (.fetchRequest (:handler (. this state)) func)
-  )
-
-(defn -failRequest [this id]
-  (.failRequest (:handler (. this state)) id)
-  )
-
-(defn -getServiceId [this]
-  (:service-id (. this state)))
-
-(defn -shutdown [this]
-  (ServiceRegistry/unregisterService (:service-id (. this state)))
-  (.shutdown (:handler (. this state)))
-  )

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/MockAutoCred.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/MockAutoCred.clj 
b/storm-core/src/clj/backtype/storm/MockAutoCred.clj
deleted file mode 100644
index 5e37528..0000000
--- a/storm-core/src/clj/backtype/storm/MockAutoCred.clj
+++ /dev/null
@@ -1,58 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-;;mock implementation of INimbusCredentialPlugin,IAutoCredentials and 
ICredentialsRenewer for testing only.
-(ns backtype.storm.MockAutoCred
-  (:use [backtype.storm testing config])
-  (:import [backtype.storm.security.INimbusCredentialPlugin]
-           [backtype.storm.security.auth   ICredentialsRenewer])
-  (:gen-class
-    :implements [backtype.storm.security.INimbusCredentialPlugin
-                 backtype.storm.security.auth.IAutoCredentials
-                 backtype.storm.security.auth.ICredentialsRenewer]))
-
-(def nimbus-cred-key "nimbusCredTestKey")
-(def nimbus-cred-val "nimbusTestCred")
-(def nimbus-cred-renew-val "renewedNimbusTestCred")
-(def gateway-cred-key "gatewayCredTestKey")
-(def gateway-cred-val "gatewayTestCred")
-(def gateway-cred-renew-val "renewedGatewayTestCred")
-
-(defn -populateCredentials
-  ([this creds conf]
-  (.put creds nimbus-cred-key nimbus-cred-val))
-  ([this creds]
-  (.put creds gateway-cred-key gateway-cred-val)))
-
-(defn -prepare
-  [this conf])
-
-(defn -renew
-  [this cred conf]
-  (.put cred nimbus-cred-key nimbus-cred-renew-val)
-  (.put cred gateway-cred-key gateway-cred-renew-val))
-
-(defn -populateSubject
-  [subject credentials]
-  (.add (.getPublicCredentials subject) (.get credentials nimbus-cred-key))
-  (.add (.getPublicCredentials subject) (.get credentials gateway-cred-key)))
-
-(defn -updateSubject
-  [subject credentials]
-  (-populateSubject subject credentials))
-
-
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/blobstore.clj 
b/storm-core/src/clj/backtype/storm/blobstore.clj
deleted file mode 100644
index 936f4b5..0000000
--- a/storm-core/src/clj/backtype/storm/blobstore.clj
+++ /dev/null
@@ -1,28 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.blobstore
-  (:import [backtype.storm.utils Utils])
-  (:import [backtype.storm.blobstore ClientBlobStore])
-  (:use [backtype.storm config]))
-
-(defmacro with-configured-blob-client
-  [client-sym & body]
-  `(let [conf# (read-storm-config)
-         ^ClientBlobStore ~client-sym (Utils/getClientBlobStore conf#)]
-     (try
-       ~@body
-       (finally (.shutdown ~client-sym)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/clojure.clj 
b/storm-core/src/clj/backtype/storm/clojure.clj
deleted file mode 100644
index a73166a..0000000
--- a/storm-core/src/clj/backtype/storm/clojure.clj
+++ /dev/null
@@ -1,201 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.clojure
-  (:use [backtype.storm util])
-  (:import [backtype.storm StormSubmitter])
-  (:import [backtype.storm.generated StreamInfo])
-  (:import [backtype.storm.tuple Tuple])
-  (:import [backtype.storm.task OutputCollector IBolt TopologyContext])
-  (:import [backtype.storm.spout SpoutOutputCollector ISpout])
-  (:import [backtype.storm.utils Utils])
-  (:import [backtype.storm.clojure ClojureBolt ClojureSpout])
-  (:import [java.util List])
-  (:require [backtype.storm [thrift :as thrift]]))
-
-(defn direct-stream [fields]
-  (StreamInfo. fields true))
-
-(defn to-spec [avar]
-  (let [m (meta avar)]
-    [(str (:ns m)) (str (:name m))]))
-
-(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
-  (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args 
(thrift/mk-output-spec output-spec)))
-
-(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
-  `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))
-
-(defn clojure-spout* [output-spec fn-var conf-var args]
-  (let [m (meta fn-var)]
-    (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args 
(thrift/mk-output-spec output-spec))
-    ))
-
-(defmacro clojure-spout [output-spec fn-sym conf-sym args]
-  `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))
-
-(defn normalize-fns [body]
-  (for [[name args & impl] body
-        :let [args (-> "this"
-                       gensym
-                       (cons args)
-                       vec)]]
-    (concat [name args] impl)
-    ))
-
-(defmacro bolt [& body]
-  (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
-        fns (normalize-fns bolt-fns)]
-    `(reify IBolt
-       ~@fns
-       ~@other-fns)))
-
-(defmacro bolt-execute [& body]
-  `(bolt
-     (~'execute ~@body)))
-
-(defmacro spout [& body]
-  (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
-        fns (normalize-fns spout-fns)]
-    `(reify ISpout
-       ~@fns
-       ~@other-fns)))
-
-(defmacro defbolt [name output-spec & [opts & impl :as all]]
-  (if-not (map? opts)
-    `(defbolt ~name ~output-spec {} ~@all)
-    (let [worker-name (symbol (str name "__"))
-          conf-fn-name (symbol (str name "__conf__"))
-          params (:params opts)
-          conf-code (:conf opts)
-          fn-body (if (:prepare opts)
-                    (cons 'fn impl)
-                    (let [[args & impl-body] impl
-                          coll-sym (nth args 1)
-                          args (vec (take 1 args))
-                          prepargs [(gensym "conf") (gensym "context") 
coll-sym]]
-                      `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
-          definer (if params
-                    `(defn ~name [& args#]
-                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name 
args#))
-                    `(def ~name
-                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name 
[]))
-                    )
-          ]
-      `(do
-         (defn ~conf-fn-name ~(if params params [])
-           ~conf-code
-           )
-         (defn ~worker-name ~(if params params [])
-           ~fn-body
-           )
-         ~definer
-         ))))
-
-(defmacro defspout [name output-spec & [opts & impl :as all]]
-  (if-not (map? opts)
-    `(defspout ~name ~output-spec {} ~@all)
-    (let [worker-name (symbol (str name "__"))
-          conf-fn-name (symbol (str name "__conf__"))
-          params (:params opts)
-          conf-code (:conf opts)
-          prepare? (:prepare opts)
-          prepare? (if (nil? prepare?) true prepare?)
-          fn-body (if prepare?
-                    (cons 'fn impl)
-                    (let [[args & impl-body] impl
-                          coll-sym (first args)
-                          prepargs [(gensym "conf") (gensym "context") 
coll-sym]]
-                      `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
-          definer (if params
-                    `(defn ~name [& args#]
-                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name 
args#))
-                    `(def ~name
-                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name 
[]))
-                    )
-          ]
-      `(do
-         (defn ~conf-fn-name ~(if params params [])
-           ~conf-code
-           )
-         (defn ~worker-name ~(if params params [])
-           ~fn-body
-           )
-         ~definer
-         ))))
-
-(defprotocol TupleValues
-  (tuple-values [values collector stream]))
-
-(extend-protocol TupleValues
-  java.util.Map
-  (tuple-values [this collector ^String stream]
-    (let [^TopologyContext context (:context collector)
-          fields (..  context (getThisOutputFields stream) toList) ]
-      (vec (map (into
-                  (empty this) (for [[k v] this]
-                                   [(if (keyword? k) (name k) k) v]))
-                fields))))
-  java.util.List
-  (tuple-values [this collector stream]
-    this))
-
-(defnk emit-bolt! [collector values
-                   :stream Utils/DEFAULT_STREAM_ID :anchor []]
-  (let [^List anchor (collectify anchor)
-        values (tuple-values values collector stream) ]
-    (.emit ^OutputCollector (:output-collector collector) stream anchor values)
-    ))
-
-(defnk emit-direct-bolt! [collector task values
-                          :stream Utils/DEFAULT_STREAM_ID :anchor []]
-  (let [^List anchor (collectify anchor)
-        values (tuple-values values collector stream) ]
-    (.emitDirect ^OutputCollector (:output-collector collector) task stream 
anchor values)
-    ))
-
-(defn ack! [collector ^Tuple tuple]
-  (.ack ^OutputCollector (:output-collector collector) tuple))
-
-(defn fail! [collector ^Tuple tuple]
-  (.fail ^OutputCollector (:output-collector collector) tuple))
-
-(defn report-error! [collector ^Tuple tuple]
-  (.reportError ^OutputCollector (:output-collector collector) tuple))
-
-(defnk emit-spout! [collector values
-                    :stream Utils/DEFAULT_STREAM_ID :id nil]
-  (let [values (tuple-values values collector stream)]
-    (.emit ^SpoutOutputCollector (:output-collector collector) stream values 
id)))
-
-(defnk emit-direct-spout! [collector task values
-                           :stream Utils/DEFAULT_STREAM_ID :id nil]
-  (let [values (tuple-values values collector stream)]
-    (.emitDirect ^SpoutOutputCollector (:output-collector collector) task 
stream values id)))
-
-(defalias topology thrift/mk-topology)
-(defalias bolt-spec thrift/mk-bolt-spec)
-(defalias spout-spec thrift/mk-spout-spec)
-(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
-(defalias shell-spout-spec thrift/mk-shell-spout-spec)
-
-(defn submit-remote-topology [name conf topology]
-  (StormSubmitter/submitTopology name conf topology))
-
-(defn local-cluster []
-  ;; do this to avoid a cyclic dependency of
-  ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
-  (eval '(new backtype.storm.LocalCluster)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj 
b/storm-core/src/clj/backtype/storm/cluster.clj
deleted file mode 100644
index ebe4955..0000000
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ /dev/null
@@ -1,691 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.cluster
-  (:import [org.apache.zookeeper.data Stat ACL Id]
-           [backtype.storm.generated SupervisorInfo Assignment StormBase 
ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
-            LogConfig ProfileAction ProfileRequest NodeInfo]
-           [java.io Serializable])
-  (:import [org.apache.zookeeper KeeperException 
KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
-  (:import [org.apache.curator.framework CuratorFramework])
-  (:import [backtype.storm.utils Utils])
-  (:import [backtype.storm.cluster ClusterState ClusterStateContext 
ClusterStateListener ConnectionState])
-  (:import [java.security MessageDigest])
-  (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
-  (:import [backtype.storm.nimbus NimbusInfo])
-  (:use [backtype.storm util log config converter])
-  (:require [backtype.storm [zookeeper :as zk]])
-  (:require [backtype.storm.daemon [common :as common]]))
-
-(defn mk-topo-only-acls
-  [topo-conf]
-  (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)]
-    (when (Utils/isZkAuthenticationConfiguredTopology topo-conf)
-      [(first ZooDefs$Ids/CREATOR_ALL_ACL)
-       (ACL. ZooDefs$Perms/READ (Id. "digest" 
(DigestAuthenticationProvider/generateDigest payload)))])))
- 
-(defnk mk-distributed-cluster-state
-  [conf :auth-conf nil :acls nil :context (ClusterStateContext.)]
-  (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE)
-                                 
"backtype.storm.cluster_state.zookeeper_state_factory"))
-        state-instance (.newInstance clazz)]
-    (log-debug "Creating cluster state: " (.toString clazz))
-    (or (.mkState state-instance conf auth-conf acls context)
-        nil)))
-
-(defprotocol StormClusterState
-  (assignments [this callback])
-  (assignment-info [this storm-id callback])
-  (assignment-info-with-version [this storm-id callback])
-  (assignment-version [this storm-id callback])
-  ;returns key information under /storm/blobstore/key
-  (blobstore-info [this blob-key])
-  ;returns list of nimbus summaries stored under 
/stormroot/nimbuses/<nimbus-ids> -> <data>
-  (nimbuses [this])
-  ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
-  (add-nimbus-host! [this nimbus-id nimbus-summary])
-
-  (active-storms [this])
-  (storm-base [this storm-id callback])
-  (get-worker-heartbeat [this storm-id node port])
-  (get-worker-profile-requests [this storm-id nodeinfo thrift?])
-  (get-topology-profile-requests [this storm-id thrift?])
-  (set-worker-profile-request [this storm-id profile-request])
-  (delete-topology-profile-requests [this storm-id profile-request])
-  (executor-beats [this storm-id executor->node+port])
-  (supervisors [this callback])
-  (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
-  (setup-heartbeats! [this storm-id])
-  (teardown-heartbeats! [this storm-id])
-  (teardown-topology-errors! [this storm-id])
-  (heartbeat-storms [this])
-  (error-topologies [this])
-  (set-topology-log-config! [this storm-id log-config])
-  (topology-log-config [this storm-id cb])
-  (worker-heartbeat! [this storm-id node port info])
-  (remove-worker-heartbeat! [this storm-id node port])
-  (supervisor-heartbeat! [this supervisor-id info])
-  (worker-backpressure! [this storm-id node port info])
-  (topology-backpressure [this storm-id callback])
-  (setup-backpressure! [this storm-id])
-  (remove-worker-backpressure! [this storm-id node port])
-  (activate-storm! [this storm-id storm-base])
-  (update-storm! [this storm-id new-elems])
-  (remove-storm-base! [this storm-id])
-  (set-assignment! [this storm-id info])
-  ;; sets up information related to key consisting of nimbus
-  ;; host:port and version info of the blob
-  (setup-blobstore! [this key nimbusInfo versionInfo])
-  (active-keys [this])
-  (blobstore [this callback])
-  (remove-storm! [this storm-id])
-  (remove-blobstore-key! [this blob-key])
-  (remove-key-version! [this blob-key])
-  (report-error [this storm-id component-id node port error])
-  (errors [this storm-id component-id])
-  (last-error [this storm-id component-id])
-  (set-credentials! [this storm-id creds topo-conf])
-  (credentials [this storm-id callback])
-  (disconnect [this]))
-
-(def ASSIGNMENTS-ROOT "assignments")
-(def CODE-ROOT "code")
-(def STORMS-ROOT "storms")
-(def SUPERVISORS-ROOT "supervisors")
-(def WORKERBEATS-ROOT "workerbeats")
-(def BACKPRESSURE-ROOT "backpressure")
-(def ERRORS-ROOT "errors")
-(def BLOBSTORE-ROOT "blobstore")
-; Stores the latest update sequence for a blob
-(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber")
-(def NIMBUSES-ROOT "nimbuses")
-(def CREDENTIALS-ROOT "credentials")
-(def LOGCONFIG-ROOT "logconfigs")
-(def PROFILERCONFIG-ROOT "profilerconfigs")
-
-(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
-(def STORMS-SUBTREE (str "/" STORMS-ROOT))
-(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
-(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
-(def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
-(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
-;; Blobstore subtree /storm/blobstore
-(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT))
-(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" 
BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT))
-(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
-(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
-(def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT))
-(def PROFILERCONFIG-SUBTREE (str "/" PROFILERCONFIG-ROOT))
-
-(defn supervisor-path
-  [id]
-  (str SUPERVISORS-SUBTREE "/" id))
-
-(defn assignment-path
-  [id]
-  (str ASSIGNMENTS-SUBTREE "/" id))
-
-(defn blobstore-path
-  [key]
-  (str BLOBSTORE-SUBTREE "/" key))
-
-(defn blobstore-max-key-sequence-number-path
-  [key]
-  (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key))
-
-(defn nimbus-path
-  [id]
-  (str NIMBUSES-SUBTREE "/" id))
-
-(defn storm-path
-  [id]
-  (str STORMS-SUBTREE "/" id))
-
-(defn workerbeat-storm-root
-  [storm-id]
-  (str WORKERBEATS-SUBTREE "/" storm-id))
-
-(defn workerbeat-path
-  [storm-id node port]
-  (str (workerbeat-storm-root storm-id) "/" node "-" port))
-
-(defn backpressure-storm-root
-  [storm-id]
-  (str BACKPRESSURE-SUBTREE "/" storm-id))
-
-(defn backpressure-path
-  [storm-id node port]
-  (str (backpressure-storm-root storm-id) "/" node "-" port))
-
-(defn error-storm-root
-  [storm-id]
-  (str ERRORS-SUBTREE "/" storm-id))
-
-(defn error-path
-  [storm-id component-id]
-  (str (error-storm-root storm-id) "/" (url-encode component-id)))
-
-(def last-error-path-seg "last-error")
-
-(defn last-error-path
-  [storm-id component-id]
-  (str (error-storm-root storm-id)
-       "/"
-       (url-encode component-id)
-       "-"
-       last-error-path-seg))
-
-(defn credentials-path
-  [storm-id]
-  (str CREDENTIALS-SUBTREE "/" storm-id))
-
-(defn log-config-path
-  [storm-id]
-  (str LOGCONFIG-SUBTREE "/" storm-id))
-
-(defn profiler-config-path
-  ([storm-id]
-   (str PROFILERCONFIG-SUBTREE "/" storm-id))
-  ([storm-id host port request-type]
-   (str (profiler-config-path storm-id) "/" host "_" port "_" request-type)))
-
-(defn- issue-callback!
-  [cb-atom]
-  (let [cb @cb-atom]
-    (reset! cb-atom nil)
-    (when cb
-      (cb))))
-
-(defn- issue-map-callback!
-  [cb-atom id]
-  (let [cb (@cb-atom id)]
-    (swap! cb-atom dissoc id)
-    (when cb
-      (cb id))))
-
-(defn- maybe-deserialize
-  [ser clazz]
-  (when ser
-    (Utils/deserialize ser clazz)))
-
-(defrecord TaskError [error time-secs host port])
-
-(defn- parse-error-path
-  [^String p]
-  (Long/parseLong (.substring p 1)))
-
-(defn convert-executor-beats
-  "Ensures that we only return heartbeats for executors assigned to
-  this worker."
-  [executors worker-hb]
-  (let [executor-stats (:executor-stats worker-hb)]
-    (->> executors
-         (map (fn [t]
-                (if (contains? executor-stats t)
-                  {t {:time-secs (:time-secs worker-hb)
-                      :uptime (:uptime worker-hb)
-                      :stats (get executor-stats t)}})))
-         (into {}))))
-
-;; Watches should be used for optimization. When ZK is reconnecting, they're 
not guaranteed to be called.
-(defnk mk-storm-cluster-state
-  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
-  (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec)
-                                [false cluster-state-spec]
-                                [true (mk-distributed-cluster-state 
cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
-        assignment-info-callback (atom {})
-        assignment-info-with-version-callback (atom {})
-        assignment-version-callback (atom {})
-        supervisors-callback (atom nil)
-        backpressure-callback (atom {})   ;; we want to reigister a topo 
directory getChildren callback for all workers of this dir
-        assignments-callback (atom nil)
-        storm-base-callback (atom {})
-        blobstore-callback (atom nil)
-        credentials-callback (atom {})
-        log-config-callback (atom {})
-        state-id (.register
-                  cluster-state
-                  (fn [type path]
-                    (let [[subtree & args] (tokenize-path path)]
-                      (condp = subtree
-                         ASSIGNMENTS-ROOT (if (empty? args)
-                                             (issue-callback! 
assignments-callback)
-                                             (do
-                                               (issue-map-callback! 
assignment-info-callback (first args))
-                                               (issue-map-callback! 
assignment-version-callback (first args))
-                                               (issue-map-callback! 
assignment-info-with-version-callback (first args))))
-                         SUPERVISORS-ROOT (issue-callback! 
supervisors-callback)
-                         BLOBSTORE-ROOT (issue-callback! blobstore-callback) 
;; callback register for blobstore
-                         STORMS-ROOT (issue-map-callback! storm-base-callback 
(first args))
-                         CREDENTIALS-ROOT (issue-map-callback! 
credentials-callback (first args))
-                         LOGCONFIG-ROOT (issue-map-callback! 
log-config-callback (first args))
-                         BACKPRESSURE-ROOT (issue-map-callback! 
backpressure-callback (first args))
-                         ;; this should never happen
-                         (exit-process! 30 "Unknown callback for subtree " 
subtree args)))))]
-    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE 
WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
-               LOGCONFIG-SUBTREE]]
-      (.mkdirs cluster-state p acls))
-    (reify
-      StormClusterState
-
-      (assignments
-        [this callback]
-        (when callback
-          (reset! assignments-callback callback))
-        (.get_children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
-
-      (assignment-info
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-info-callback assoc storm-id callback))
-        (clojurify-assignment (maybe-deserialize (.get_data cluster-state 
(assignment-path storm-id) (not-nil? callback)) Assignment)))
-
-      (assignment-info-with-version 
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-info-with-version-callback assoc storm-id 
callback))
-        (let [{data :data version :version} 
-              (.get_data_with_version cluster-state (assignment-path storm-id) 
(not-nil? callback))]
-        {:data (clojurify-assignment (maybe-deserialize data Assignment))
-         :version version}))
-
-      (assignment-version 
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-version-callback assoc storm-id callback))
-        (.get_version cluster-state (assignment-path storm-id) (not-nil? 
callback)))
-
-      ;; blobstore state
-      (blobstore
-        [this callback]
-        (when callback
-          (reset! blobstore-callback callback))
-        (.sync_path cluster-state BLOBSTORE-SUBTREE)
-        (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback)))
-
-      (nimbuses
-        [this]
-        (map #(maybe-deserialize (.get_data cluster-state (nimbus-path %1) 
false) NimbusSummary)
-          (.get_children cluster-state NIMBUSES-SUBTREE false)))
-
-      (add-nimbus-host!
-        [this nimbus-id nimbus-summary]
-        ;explicit delete for ephmeral node to ensure this session creates the 
entry.
-        (.delete_node cluster-state (nimbus-path nimbus-id))
-
-        (.add_listener cluster-state (reify ClusterStateListener
-                        (^void stateChanged[this ^ConnectionState newState]
-                          (log-message "Connection state listener invoked, 
zookeeper connection state has changed to " newState)
-                          (if (.equals newState ConnectionState/RECONNECTED)
-                            (do
-                              (log-message "Connection state has changed to 
reconnected so setting nimbuses entry one more time")
-                              (.set_ephemeral_node cluster-state (nimbus-path 
nimbus-id) (Utils/serialize nimbus-summary) acls))))))
-        
-        (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) 
(Utils/serialize nimbus-summary) acls))
-
-      (setup-blobstore!
-        [this key nimbusInfo versionInfo]
-        (let [path (str (blobstore-path key) "/" (.toHostPortString 
nimbusInfo) "-" versionInfo)]
-          (log-message "setup-path" path)
-          (.mkdirs cluster-state (blobstore-path key) acls)
-          ;we delete the node first to ensure the node gets created as part of 
this session only.
-          (.delete_node_blobstore cluster-state (str (blobstore-path key)) 
(.toHostPortString nimbusInfo))
-          (.set_ephemeral_node cluster-state path nil acls)))
-
-      (blobstore-info
-        [this blob-key]
-        (let [path (blobstore-path blob-key)]
-          (.sync_path cluster-state path)
-          (.get_children cluster-state path false)))
-
-      (active-storms
-        [this]
-        (.get_children cluster-state STORMS-SUBTREE false))
-
-      (active-keys
-        [this]
-        (.get_children cluster-state BLOBSTORE-SUBTREE false))
-
-      (heartbeat-storms
-        [this]
-        (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false))
-
-      (error-topologies
-        [this]
-        (.get_children cluster-state ERRORS-SUBTREE false))
-
-      (get-worker-heartbeat
-        [this storm-id node port]
-        (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path 
storm-id node port) false)]
-          (if worker-hb
-            (-> worker-hb
-              (maybe-deserialize ClusterWorkerHeartbeat)
-              clojurify-zk-worker-hb))))
-
-      (executor-beats
-        [this storm-id executor->node+port]
-        ;; need to take executor->node+port in explicitly so that we don't run 
into a situation where a
-        ;; long dead worker with a skewed clock overrides all the timestamps. 
By only checking heartbeats
-        ;; with an assigned node+port, and only reading executors from that 
heartbeat that are actually assigned,
-        ;; we avoid situations like that
-        (let [node+port->executors (reverse-map executor->node+port)
-              all-heartbeats (for [[[node port] executors] 
node+port->executors]
-                               (->> (get-worker-heartbeat this storm-id node 
port)
-                                    (convert-executor-beats executors)
-                                    ))]
-          (apply merge all-heartbeats)))
-
-      (supervisors
-        [this callback]
-        (when callback
-          (reset! supervisors-callback callback))
-        (.get_children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
-
-      (supervisor-info
-        [this supervisor-id]
-        (clojurify-supervisor-info (maybe-deserialize (.get_data cluster-state 
(supervisor-path supervisor-id) false) SupervisorInfo)))
-
-      (topology-log-config
-        [this storm-id cb]
-        (when cb
-          (swap! log-config-callback assoc storm-id cb))
-        (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) 
(not-nil? cb)) LogConfig))
-
-      (set-topology-log-config!
-        [this storm-id log-config]
-        (.set_data cluster-state (log-config-path storm-id) (Utils/serialize 
log-config) acls))
-
-      (set-worker-profile-request
-        [this storm-id profile-request]
-        (let [request-type (.get_action profile-request)
-              host (.get_node (.get_nodeInfo profile-request))
-              port (first (.get_port (.get_nodeInfo profile-request)))]
-          (.set_data cluster-state
-                     (profiler-config-path storm-id host port request-type)
-                     (Utils/serialize profile-request)
-                     acls)))
-
-      (get-topology-profile-requests
-        [this storm-id thrift?]
-        (let [path (profiler-config-path storm-id)
-              requests (if (.node_exists cluster-state path false)
-                         (dofor [c (.get_children cluster-state path false)]
-                                (let [raw (.get_data cluster-state (str path 
"/" c) false)
-                                      request (maybe-deserialize raw 
ProfileRequest)]
-                                      (if thrift?
-                                        request
-                                        (clojurify-profile-request 
request)))))]
-          requests))
-
-      (delete-topology-profile-requests
-        [this storm-id profile-request]
-        (let [profile-request-inst (thriftify-profile-request profile-request)
-              action (:action profile-request)
-              host (:host profile-request)
-              port (:port profile-request)]
-          (.delete_node cluster-state
-           (profiler-config-path storm-id host port action))))
-          
-      (get-worker-profile-requests
-        [this storm-id node-info thrift?]
-        (let [host (:host node-info)
-              port (:port node-info)
-              profile-requests (get-topology-profile-requests this storm-id 
thrift?)]
-          (if thrift?
-            (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port 
(first (.get_port (.get_nodeInfo  %)))))
-                    profile-requests)
-            (filter #(and (= host (:host %)) (= port (:port %)))
-                    profile-requests))))
-      
-      (worker-heartbeat!
-        [this storm-id node port info]
-        (let [thrift-worker-hb (thriftify-zk-worker-hb info)]
-          (if thrift-worker-hb
-            (.set_worker_hb cluster-state (workerbeat-path storm-id node port) 
(Utils/serialize thrift-worker-hb) acls))))
-
-      (remove-worker-heartbeat!
-        [this storm-id node port]
-        (.delete_worker_hb cluster-state (workerbeat-path storm-id node port)))
-
-      (setup-heartbeats!
-        [this storm-id]
-        (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls))
-
-      (teardown-heartbeats!
-        [this storm-id]
-        (try-cause
-          (.delete_worker_hb cluster-state (workerbeat-storm-root storm-id))
-          (catch KeeperException e
-            (log-warn-error e "Could not teardown heartbeats for " storm-id))))
-
-      (worker-backpressure!
-        [this storm-id node port on?]
-        "if znode exists and to be not on?, delete; if exists and on?, do 
nothing;
-        if not exists and to be on?, create; if not exists and not on?, do 
nothing"
-        (let [path (backpressure-path storm-id node port)
-              existed (.node_exists cluster-state path false)]
-          (if existed
-            (if (not on?)
-              (.delete_node cluster-state path))   ;; delete the znode since 
the worker is not congested
-            (if on?
-              (.set_ephemeral_node cluster-state path nil acls))))) ;; create 
the znode since worker is congested
-    
-      (topology-backpressure
-        [this storm-id callback]
-        "if the backpresure/storm-id dir is empty, this topology has 
throttle-on, otherwise not."
-        (when callback
-          (swap! backpressure-callback assoc storm-id callback))
-        (let [path (backpressure-storm-root storm-id)
-              children (.get_children cluster-state path (not-nil? callback))]
-              (> (count children) 0)))
-      
-      (setup-backpressure!
-        [this storm-id]
-        (.mkdirs cluster-state (backpressure-storm-root storm-id) acls))
-
-      (remove-worker-backpressure!
-        [this storm-id node port]
-        (.delete_node cluster-state (backpressure-path storm-id node port)))
-
-      (teardown-topology-errors!
-        [this storm-id]
-        (try-cause
-          (.delete_node cluster-state (error-storm-root storm-id))
-          (catch KeeperException e
-            (log-warn-error e "Could not teardown errors for " storm-id))))
-
-      (supervisor-heartbeat!
-        [this supervisor-id info]
-        (let [thrift-supervisor-info (thriftify-supervisor-info info)]
-          (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) 
(Utils/serialize thrift-supervisor-info) acls)))
-
-      (activate-storm!
-        [this storm-id storm-base]
-        (let [thrift-storm-base (thriftify-storm-base storm-base)]
-          (.set_data cluster-state (storm-path storm-id) (Utils/serialize 
thrift-storm-base) acls)))
-
-      (update-storm!
-        [this storm-id new-elems]
-        (let [base (storm-base this storm-id nil)
-              executors (:component->executors base)
-              component->debug (:component->debug base)
-              new-elems (update new-elems :component->executors (partial merge 
executors))
-              new-elems (update new-elems :component->debug (partial 
merge-with merge component->debug))]
-          (.set_data cluster-state (storm-path storm-id)
-                    (-> base
-                        (merge new-elems)
-                        thriftify-storm-base
-                        Utils/serialize)
-                    acls)))
-
-      (storm-base
-        [this storm-id callback]
-        (when callback
-          (swap! storm-base-callback assoc storm-id callback))
-        (clojurify-storm-base (maybe-deserialize (.get_data cluster-state 
(storm-path storm-id) (not-nil? callback)) StormBase)))
-
-      (remove-storm-base!
-        [this storm-id]
-        (.delete_node cluster-state (storm-path storm-id)))
-
-      (set-assignment!
-        [this storm-id info]
-        (let [thrift-assignment (thriftify-assignment info)]
-          (.set_data cluster-state (assignment-path storm-id) (Utils/serialize 
thrift-assignment) acls)))
-
-      (remove-blobstore-key!
-        [this blob-key]
-        (log-debug "removing key" blob-key)
-        (.delete_node cluster-state (blobstore-path blob-key)))
-
-      (remove-key-version!
-        [this blob-key]
-        (.delete_node cluster-state (blobstore-max-key-sequence-number-path 
blob-key)))
-
-      (remove-storm!
-        [this storm-id]
-        (.delete_node cluster-state (assignment-path storm-id))
-        (.delete_node cluster-state (credentials-path storm-id))
-        (.delete_node cluster-state (log-config-path storm-id))
-        (.delete_node cluster-state (profiler-config-path storm-id))
-        (remove-storm-base! this storm-id))
-
-      (set-credentials!
-         [this storm-id creds topo-conf]
-         (let [topo-acls (mk-topo-only-acls topo-conf)
-               path (credentials-path storm-id)
-               thriftified-creds (thriftify-credentials creds)]
-           (.set_data cluster-state path (Utils/serialize thriftified-creds) 
topo-acls)))
-
-      (credentials
-        [this storm-id callback]
-        (when callback
-          (swap! credentials-callback assoc storm-id callback))
-        (clojurify-crdentials (maybe-deserialize (.get_data cluster-state 
(credentials-path storm-id) (not-nil? callback)) Credentials)))
-
-      (report-error
-         [this storm-id component-id node port error]
-         (let [path (error-path storm-id component-id)
-               last-error-path (last-error-path storm-id component-id)
-               data (thriftify-error {:time-secs (current-time-secs) :error 
(stringify-error error) :host node :port port})
-               _ (.mkdirs cluster-state path acls)
-               ser-data (Utils/serialize data)
-               _ (.mkdirs cluster-state path acls)
-               _ (.create_sequential cluster-state (str path "/e") ser-data 
acls)
-               _ (.set_data cluster-state last-error-path ser-data acls)
-               to-kill (->> (.get_children cluster-state path false)
-                            (sort-by parse-error-path)
-                            reverse
-                            (drop 10))]
-           (doseq [k to-kill]
-             (.delete_node cluster-state (str path "/" k)))))
-
-      (errors
-         [this storm-id component-id]
-         (let [path (error-path storm-id component-id)
-               errors (if (.node_exists cluster-state path false)
-                        (dofor [c (.get_children cluster-state path false)]
-                          (if-let [data (-> (.get_data cluster-state
-                                                      (str path "/" c)
-                                                      false)
-                                          (maybe-deserialize ErrorInfo)
-                                          clojurify-error)]
-                            (map->TaskError data)))
-                        ())]
-           (->> (filter not-nil? errors)
-                (sort-by (comp - :time-secs)))))
-
-      (last-error
-        [this storm-id component-id]
-        (let [path (last-error-path storm-id component-id)]
-          (if (.node_exists cluster-state path false)
-            (if-let [data (-> (.get_data cluster-state path false)
-                              (maybe-deserialize ErrorInfo)
-                              clojurify-error)]
-              (map->TaskError data)))))
-      
-      (disconnect
-         [this]
-        (.unregister cluster-state state-id)
-        (when solo?
-          (.close cluster-state))))))
-
-;; daemons have a single thread that will respond to events
-;; start with initialize event
-;; callbacks add events to the thread's queue
-
-;; keeps in memory cache of the state, only for what client subscribes to. Any 
subscription is automatically kept in sync, and when there are changes, client 
is notified.
-;; master gives orders through state, and client records status in state 
(ephemerally)
-
-;; master tells nodes what workers to launch
-
-;; master writes this. supervisors and workers subscribe to this to understand 
complete topology. each storm is a map from nodes to workers to tasks to ports 
whenever topology changes everyone will be notified
-;; master includes timestamp of each assignment so that appropriate time can 
be given to each worker to start up
-;; /assignments/{storm id}
-
-;; which tasks they talk to, etc. (immutable until shutdown)
-;; everyone reads this in full to understand structure
-;; /tasks/{storm id}/{task id} ; just contains bolt id
-
-;; supervisors send heartbeats here, master doesn't subscribe but checks 
asynchronously
-;; /supervisors/status/{ephemeral node ids}  ;; node metadata such as port 
ranges are kept here
-
-;; tasks send heartbeats here, master doesn't subscribe, just checks 
asynchronously
-;; /taskbeats/{storm id}/{ephemeral task id}
-
-;; contains data about whether it's started or not, tasks and workers 
subscribe to specific storm here to know when to shutdown
-;; master manipulates
-;; /storms/{storm id}
-
-;; Zookeeper flows:
-
-;; Master:
-;; job submit:
-;; 1. read which nodes are available
-;; 2. set up the worker/{storm}/{task} stuff (static)
-;; 3. set assignments
-;; 4. start storm - necessary in case master goes down, when goes back up can 
remember to take down the storm (2 states: on or off)
-
-;; Monitoring (or by checking when nodes go down or heartbeats aren't 
received):
-;; 1. read assignment
-;; 2. see which tasks/nodes are up
-;; 3. make new assignment to fix any problems
-;; 4. if a storm exists but is not taken down fully, ensure that storm 
takedown is launched (step by step remove tasks and finally remove assignments)
-
-;; masters only possible watches is on ephemeral nodes and tasks, and maybe 
not even
-
-;; Supervisor:
-;; 1. monitor /storms/* and assignments
-;; 2. local state about which workers are local
-;; 3. when storm is on, check that workers are running locally & start/kill if 
different than assignments
-;; 4. when storm is off, monitor tasks for workers - when they all die or 
don't hearbeat, kill the process and cleanup
-
-;; Worker:
-;; 1. On startup, start the tasks if the storm is on
-
-;; Task:
-;; 1. monitor assignments, reroute when assignments change
-;; 2. monitor storm (when storm turns off, error if assignments change) - take 
down tasks as master turns them off
-
-;; locally on supervisor: workers write pids locally on startup, supervisor 
deletes it on shutdown (associates pid with worker name)
-;; supervisor periodically checks to make sure processes are alive
-;; {rootdir}/workers/{storm id}/{worker id}   ;; contains pid inside
-
-;; all tasks in a worker share the same cluster state
-;; workers, supervisors, and tasks subscribes to storm to know when it's 
started or stopped
-;; on stopped, master removes records in order (tasks need to subscribe to 
themselves to see if they disappear)
-;; when a master removes a worker, the supervisor should kill it (and escalate 
to kill -9)
-;; on shutdown, tasks subscribe to tasks that send data to them to wait for 
them to die. when node disappears, they can die

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj 
b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
deleted file mode 100644
index fa36240..0000000
--- 
a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
+++ /dev/null
@@ -1,161 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.cluster-state.zookeeper-state-factory
-  (:import [org.apache.curator.framework.state ConnectionStateListener])
-  (:import [org.apache.zookeeper KeeperException$NoNodeException]
-           [backtype.storm.cluster ClusterState DaemonType])
-  (:use [backtype.storm cluster config log util])
-  (:require [backtype.storm [zookeeper :as zk]])
-  (:gen-class
-   :implements [backtype.storm.cluster.ClusterStateFactory]))
-
-(defn -mkState [this conf auth-conf acls context]
-  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf 
STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
-    (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
-    (.close zk))
-  (let [callbacks (atom {})
-        active (atom true)
-        zk-writer (zk/mk-client conf
-                         (conf STORM-ZOOKEEPER-SERVERS)
-                         (conf STORM-ZOOKEEPER-PORT)
-                         :auth-conf auth-conf
-                         :root (conf STORM-ZOOKEEPER-ROOT)
-                         :watcher (fn [state type path]
-                                    (when @active
-                                      (when-not (= :connected state)
-                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Writer Zookeeper."))
-                                      (when-not (= :none type)
-                                        (doseq [callback (vals @callbacks)]
-                                          (callback type path))))))
-        is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS)
-        zk-reader (if is-nimbus?
-                    (zk/mk-client conf
-                         (conf STORM-ZOOKEEPER-SERVERS)
-                         (conf STORM-ZOOKEEPER-PORT)
-                         :auth-conf auth-conf
-                         :root (conf STORM-ZOOKEEPER-ROOT)
-                         :watcher (fn [state type path]
-                                    (when @active
-                                      (when-not (= :connected state)
-                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Reader Zookeeper."))
-                                      (when-not (= :none type)
-                                        (doseq [callback (vals @callbacks)]
-                                          (callback type path))))))
-                    zk-writer)]
-    (reify
-     ClusterState
-
-     (register
-       [this callback]
-       (let [id (uuid)]
-         (swap! callbacks assoc id callback)
-         id))
-
-     (unregister
-       [this id]
-       (swap! callbacks dissoc id))
-
-     (set-ephemeral-node
-       [this path data acls]
-       (zk/mkdirs zk-writer (parent-path path) acls)
-       (if (zk/exists zk-writer path false)
-         (try-cause
-           (zk/set-data zk-writer path data) ; should verify that it's 
ephemeral
-           (catch KeeperException$NoNodeException e
-             (log-warn-error e "Ephemeral node disappeared between checking 
for existing and setting data")
-             (zk/create-node zk-writer path data :ephemeral acls)))
-         (zk/create-node zk-writer path data :ephemeral acls)))
-
-     (create-sequential
-       [this path data acls]
-       (zk/create-node zk-writer path data :sequential acls))
-
-     (set-data
-       [this path data acls]
-       ;; note: this does not turn off any existing watches
-       (if (zk/exists zk-writer path false)
-         (zk/set-data zk-writer path data)
-         (do
-           (zk/mkdirs zk-writer (parent-path path) acls)
-           (zk/create-node zk-writer path data :persistent acls))))
-
-     (set-worker-hb
-       [this path data acls]
-       (.set_data this path data acls))
-
-     (delete-node
-       [this path]
-       (zk/delete-node zk-writer path))
-
-     (delete-worker-hb
-       [this path]
-       (.delete_node this path))
-
-     (get-data
-       [this path watch?]
-       (zk/get-data zk-reader path watch?))
-
-     (get-data-with-version
-       [this path watch?]
-       (zk/get-data-with-version zk-reader path watch?))
-
-     (get-version
-       [this path watch?]
-       (zk/get-version zk-reader path watch?))
-
-     (get-worker-hb
-       [this path watch?]
-       (.get_data this path watch?))
-
-     (get-children
-       [this path watch?]
-       (zk/get-children zk-reader path watch?))
-
-     (get-worker-hb-children
-       [this path watch?]
-       (.get_children this path watch?))
-
-     (mkdirs
-       [this path acls]
-       (zk/mkdirs zk-writer path acls))
-
-     (node-exists
-       [this path watch?]
-       (zk/exists-node? zk-reader path watch?))
-
-     (add-listener
-       [this listener]
-       (let [curator-listener (reify ConnectionStateListener
-                                (stateChanged
-                                  [this client newState]
-                                  (.stateChanged listener client newState)))]
-         (zk/add-listener zk-reader curator-listener)))
-
-     (sync-path
-       [this path]
-       (zk/sync-path zk-writer path))
-
-      (delete-node-blobstore
-        [this path nimbus-host-port-info]
-        (zk/delete-node-blobstore zk-writer path nimbus-host-port-info))
-
-     (close
-       [this]
-       (reset! active false)
-       (.close zk-writer)
-       (if is-nimbus?
-         (.close zk-reader))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/activate.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/activate.clj 
b/storm-core/src/clj/backtype/storm/command/activate.clj
deleted file mode 100644
index 500e981..0000000
--- a/storm-core/src/clj/backtype/storm/command/activate.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.activate
-  (:use [backtype.storm thrift log])
-  (:gen-class))
-
-(defn -main [name] 
-  (with-configured-nimbus-connection nimbus
-    (.activate nimbus name)
-    (log-message "Activated topology: " name)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/blobstore.clj 
b/storm-core/src/clj/backtype/storm/command/blobstore.clj
deleted file mode 100644
index ae7f919..0000000
--- a/storm-core/src/clj/backtype/storm/command/blobstore.clj
+++ /dev/null
@@ -1,162 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.blobstore
-  (:import [java.io InputStream OutputStream]
-           [backtype.storm.generated SettableBlobMeta AccessControl 
AuthorizationException
-            KeyNotFoundException]
-           [backtype.storm.blobstore BlobStoreAclHandler])
-  (:use [backtype.storm config]
-        [clojure.string :only [split]]
-        [clojure.tools.cli :only [cli]]
-        [clojure.java.io :only [copy input-stream output-stream]]
-        [backtype.storm blobstore log util])
-  (:gen-class))
-
-(defn update-blob-from-stream
-  "Update a blob in the blob store from an InputStream"
-  [key ^InputStream in]
-  (with-configured-blob-client blobstore
-    (let [out (.updateBlob blobstore key)]
-      (try 
-        (copy in out)
-        (.close out)
-        (catch Exception e
-          (log-message e)
-          (.cancel out)
-          (throw e))))))
-
-(defn create-blob-from-stream
-  "Create a blob in the blob store from an InputStream"
-  [key ^InputStream in ^SettableBlobMeta meta]
-  (with-configured-blob-client blobstore
-    (let [out (.createBlob blobstore key meta)]
-      (try 
-        (copy in out)
-        (.close out)
-        (catch Exception e
-          (.cancel out)
-          (throw e))))))
-
-(defn read-blob
-  "Read a blob in the blob store and write to an OutputStream"
-  [key ^OutputStream out]
-  (with-configured-blob-client blobstore
-    (with-open [in (.getBlob blobstore key)]
-      (copy in out))))
-
-(defn as-access-control
-  "Convert a parameter to an AccessControl object"
-  [param]
-  (BlobStoreAclHandler/parseAccessControl (str param)))
-
-(defn as-acl
-  [param]
-  (map as-access-control (split param #",")))
-
-(defn access-control-str
-  [^AccessControl acl]
-  (BlobStoreAclHandler/accessControlToString acl))
-
-(defn read-cli [args]
-  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
-    (if file
-      (with-open [f (output-stream file)]
-        (read-blob key f))
-      (read-blob key System/out))))
-
-(defn update-cli [args]
-  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
-    (if file
-      (with-open [f (input-stream file)]
-        (update-blob-from-stream key f))
-      (update-blob-from-stream key System/in))
-    (log-message "Successfully updated " key)))
-
-(defn create-cli [args]
-  (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] 
(cli args ["-f" "--file" :default nil]
-                                                  ["-a" "--acl" :default [] 
:parse-fn as-acl]
-                                                  ["-r" "--replication-factor" 
:default -1 :parse-fn parse-int])
-        meta (doto (SettableBlobMeta. acl)
-                   (.set_replication_factor replication-factor))]
-    (validate-key-name! key)
-    (log-message "Creating " key " with ACL " (pr-str (map access-control-str 
acl)))
-    (if file
-      (with-open [f (input-stream file)]
-        (create-blob-from-stream key f meta))
-      (create-blob-from-stream key System/in meta))
-    (log-message "Successfully created " key)))
-
-(defn delete-cli [args]
-  (with-configured-blob-client blobstore
-    (doseq [key args]
-      (.deleteBlob blobstore key)
-      (log-message "deleted " key))))
-
-(defn list-cli [args]
-  (with-configured-blob-client blobstore
-    (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)]
-      (doseq [key keys]
-        (try
-          (let [meta (.getBlobMeta blobstore key)
-                version (.get_version meta)
-                acl (.get_acl (.get_settable meta))]
-            (log-message key " " version " " (pr-str (map access-control-str 
acl))))
-          (catch AuthorizationException ae
-            (if-not (empty? args) (log-error "ACCESS DENIED to key: " key)))
-          (catch KeyNotFoundException knf
-            (if-not (empty? args) (log-error key " NOT FOUND"))))))))
-
-(defn set-acl-cli [args]
-  (let [[{set-acl :set} [key] _]
-           (cli args ["-s" "--set" :default [] :parse-fn as-acl])]
-    (with-configured-blob-client blobstore
-      (let [meta (.getBlobMeta blobstore key)
-            acl (.get_acl (.get_settable meta))
-            new-acl (if set-acl set-acl acl)
-            new-meta (SettableBlobMeta. new-acl)]
-        (log-message "Setting ACL for " key " to " (pr-str (map 
access-control-str new-acl)))
-        (.setBlobMeta blobstore key new-meta)))))
-
-(defn rep-cli [args]
-  (let [sub-command (first args)
-        new-args (rest args)]
-    (with-configured-blob-client blobstore
-      (condp = sub-command
-      "--read" (let [key (first new-args)
-                     blob-replication (.getBlobReplication blobstore key)]
-                 (log-message "Current replication factor " blob-replication)
-                 blob-replication)
-      "--update" (let [[{replication-factor :replication-factor} [key] _]
-                        (cli new-args ["-r" "--replication-factor" :parse-fn 
parse-int])]
-                   (if (nil? replication-factor)
-                     (throw (RuntimeException. (str "Please set the 
replication factor")))
-                     (let [blob-replication (.updateBlobReplication blobstore 
key replication-factor)]
-                       (log-message "Replication factor is set to " 
blob-replication)
-                       blob-replication)))
-      :else (throw (RuntimeException. (str sub-command " is not a supported 
blobstore command")))))))
-
-(defn -main [& args]
-  (let [command (first args)
-        new-args (rest args)]
-    (condp = command
-      "cat" (read-cli new-args)
-      "create" (create-cli new-args)
-      "update" (update-cli new-args)
-      "delete" (delete-cli new-args)
-      "list" (list-cli new-args)
-      "set-acl" (set-acl-cli new-args)
-      "replication" (rep-cli new-args)
-      :else (throw (RuntimeException. (str command " is not a supported 
blobstore command"))))))

Reply via email to