http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java index cfda54e..384f97c 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java @@ -17,22 +17,22 @@ */ package org.apache.storm.redis.trident; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.trident.state.RedisMapState; import org.apache.storm.redis.common.config.JedisPoolConfig; -import storm.trident.Stream; -import storm.trident.TridentState; -import storm.trident.TridentTopology; -import storm.trident.operation.builtin.MapGet; -import storm.trident.operation.builtin.Sum; -import storm.trident.state.StateFactory; -import storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.MapGet; +import org.apache.storm.trident.operation.builtin.Sum; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.testing.FixedBatchSpout; public class WordCountTridentRedisMap { public static StormTopology buildTopology(String redisHost, Integer redisPort){
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java index 4a9599e..2c56c39 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java @@ -18,11 +18,11 @@ package org.apache.storm.solr.bolt; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java index 9fa38cd..d078959 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java @@ -20,7 +20,7 @@ package org.apache.storm.solr.mapper; import static org.apache.storm.solr.schema.SolrFieldTypeFinder.FieldTypeWrapper; -import backtype.storm.tuple.ITuple; +import org.apache.storm.tuple.ITuple; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.impl.CloudSolrClient; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java index f9d6e9b..704ec2d 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java @@ -18,7 +18,7 @@ package org.apache.storm.solr.mapper; -import backtype.storm.tuple.ITuple; +import org.apache.storm.tuple.ITuple; import com.google.gson.Gson; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java index 96242d1..5c0223b 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java @@ -18,8 +18,8 @@ package org.apache.storm.solr.mapper; -import backtype.storm.tuple.ITuple; -import backtype.storm.tuple.Tuple; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Tuple; import org.apache.solr.client.solrj.SolrRequest; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java index 8187d11..d84d140 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java @@ -18,7 +18,7 @@ package org.apache.storm.solr.trident; -import backtype.storm.topology.FailedException; +import org.apache.storm.topology.FailedException; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.impl.CloudSolrClient; @@ -26,8 +26,8 @@ import org.apache.storm.solr.config.SolrConfig; import org.apache.storm.solr.mapper.SolrMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import storm.trident.state.State; -import storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; import java.util.List; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java index a1f815d..7b092ba 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java @@ -18,11 +18,11 @@ package org.apache.storm.solr.trident; -import backtype.storm.task.IMetricsContext; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.solr.config.SolrConfig; import org.apache.storm.solr.mapper.SolrMapper; -import storm.trident.state.State; -import storm.trident.state.StateFactory; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java index db7b995..53698fa 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java @@ -18,9 +18,9 @@ package org.apache.storm.solr.trident; -import storm.trident.operation.TridentCollector; -import storm.trident.state.BaseStateUpdater; -import storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseStateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; import java.util.List; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java index 7d2357c..8e3390d 100644 --- a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java @@ -18,12 +18,12 @@ package org.apache.storm.solr.spout; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import org.apache.storm.solr.util.TestUtil; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java index 6afed2c..bb0c83c 100644 --- a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java @@ -18,12 +18,12 @@ package org.apache.storm.solr.spout; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import com.google.gson.Gson; import org.apache.storm.solr.util.TestUtil; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java index 809f434..5c9f16d 100644 --- a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java @@ -18,8 +18,8 @@ package org.apache.storm.solr.topology; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.solr.bolt.SolrUpdateBolt; import org.apache.storm.solr.config.CountBasedCommit; import org.apache.storm.solr.config.SolrCommitStrategy; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java index 02e6d6f..24e6b5e 100644 --- a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java @@ -18,8 +18,8 @@ package org.apache.storm.solr.topology; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.solr.bolt.SolrUpdateBolt; import org.apache.storm.solr.mapper.SolrJsonMapper; import org.apache.storm.solr.mapper.SolrMapper; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java index 607cf98..e0f4dc6 100644 --- a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java @@ -18,10 +18,10 @@ package org.apache.storm.solr.topology; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.storm.solr.config.SolrCommitStrategy; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java index 4884c82..d022c8a 100644 --- a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java @@ -18,13 +18,13 @@ package org.apache.storm.solr.trident; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; import org.apache.storm.solr.spout.SolrFieldsSpout; import org.apache.storm.solr.topology.SolrFieldsTopology; -import storm.trident.Stream; -import storm.trident.TridentTopology; -import storm.trident.state.StateFactory; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.state.StateFactory; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java index d03b1dd..75131b8 100644 --- a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java @@ -18,13 +18,13 @@ package org.apache.storm.solr.trident; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; import org.apache.storm.solr.spout.SolrJsonSpout; import org.apache.storm.solr.topology.SolrJsonTopology; -import storm.trident.Stream; -import storm.trident.TridentTopology; -import storm.trident.state.StateFactory; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.state.StateFactory; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/log4j2/cluster.xml ---------------------------------------------------------------------- diff --git a/log4j2/cluster.xml b/log4j2/cluster.xml index f349d8c..ca333b2 100644 --- a/log4j2/cluster.xml +++ b/log4j2/cluster.xml @@ -73,15 +73,15 @@ </appenders> <loggers> - <Logger name="backtype.storm.logging.filters.AccessLoggingFilter" level="info" additivity="false"> + <Logger name="org.apache.storm.logging.filters.AccessLoggingFilter" level="info" additivity="false"> <AppenderRef ref="WEB-ACCESS"/> <AppenderRef ref="syslog"/> </Logger> - <Logger name="backtype.storm.logging.ThriftAccessLogger" level="info" additivity="false"> + <Logger name="org.apache.storm.logging.ThriftAccessLogger" level="info" additivity="false"> <AppenderRef ref="THRIFT-ACCESS"/> <AppenderRef ref="syslog"/> </Logger> - <Logger name="backtype.storm.metric.LoggingMetricsConsumer" level="info"> + <Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info"> <AppenderRef ref="METRICS"/> </Logger> <root level="info"> <!-- We log everything --> http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 181c2a9..bced62b 100644 --- a/pom.xml +++ b/pom.xml @@ -241,7 +241,7 @@ <maven-surefire.version>2.18.1</maven-surefire.version> <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile --> - <java.unit.test.exclude>backtype.storm.testing.IntegrationTest</java.unit.test.exclude> + <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude> <java.unit.test.include>**/Test*.java, **/*Test.java, **/*TestCase.java</java.unit.test.include> <!--maven surefire plugin default test list--> <!-- by default the clojure test set are all clojure tests that are not integration tests. This property is overridden in the profiles --> <clojure.test.set>!integration.*</clojure.test.set> @@ -364,7 +364,7 @@ <id>all-tests</id> <properties> <java.integration.test.include>**/*.java</java.integration.test.include> - <java.integration.test.group>backtype.storm.testing.IntegrationTest</java.integration.test.group> + <java.integration.test.group>org.apache.storm.testing.IntegrationTest</java.integration.test.group> <clojure.test.set>*.*</clojure.test.set> </properties> </profile> @@ -374,7 +374,7 @@ <!--Java--> <java.unit.test.include>no.unit.tests</java.unit.test.include> <java.integration.test.include>**/*.java</java.integration.test.include> - <java.integration.test.group>backtype.storm.testing.IntegrationTest</java.integration.test.group> + <java.integration.test.group>org.apache.storm.testing.IntegrationTest</java.integration.test.group> <!--Clojure--> <clojure.test.set>integration.*</clojure.test.set> <clojure.test.declared.namespace.only>true</clojure.test.declared.namespace.only> http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/pom.xml ---------------------------------------------------------------------- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 765e798..08fffa4 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -1018,9 +1018,9 @@ <configuration> <append>true</append> <excludes> - <exclude>backtype/storm/metric/api/IMetricsConsumer$DataPointFieldAccess</exclude> - <exclude>backtype/storm/metric/api/IMetricsConsumer$TaskInfoFieldAccess</exclude> - <exclude>backtype/storm/testing/TestSerObjectFieldAccess</exclude> + <exclude>org/apache/storm/metric/api/IMetricsConsumer$DataPointFieldAccess</exclude> + <exclude>org/apache/storm/metric/api/IMetricsConsumer$TaskInfoFieldAccess</exclude> + <exclude>org/apache/storm/testing/TestSerObjectFieldAccess</exclude> </excludes> </configuration> </execution> @@ -1032,7 +1032,7 @@ </goals> <configuration> <excludes> - <exclude>backtype/storm/generated/*</exclude> <!--Thrift generated code--> + <exclude>org/apache/storm/generated/*</exclude> <!--Thrift generated code--> </excludes> <includes> <include>backtype/*/*/*/*</include> http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/LocalCluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/LocalCluster.clj b/storm-core/src/clj/backtype/storm/LocalCluster.clj deleted file mode 100644 index aa37c89..0000000 --- a/storm-core/src/clj/backtype/storm/LocalCluster.clj +++ /dev/null @@ -1,106 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.LocalCluster - (:use [backtype.storm testing config util]) - (:import [backtype.storm.utils Utils]) - (:import [java.util Map]) - (:gen-class - :init init - :implements [backtype.storm.ILocalCluster] - :constructors {[] [] - [java.util.Map] [] - [String Long] []} - :state state)) - -(defn -init - ([] - (let [ret (mk-local-storm-cluster - :daemon-conf - {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})] - [[] ret])) - ([^String zk-host ^Long zk-port] - (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true - STORM-ZOOKEEPER-SERVERS (list zk-host) - STORM-ZOOKEEPER-PORT zk-port})] - [[] ret])) - ([^Map stateMap] - [[] stateMap])) - -(defn submit-hook [hook name conf topology] - (let [topologyInfo (Utils/getTopologyInfo name nil conf)] - (.notify hook topologyInfo conf topology))) - -(defn -submitTopology - [this name conf topology] - (submit-local-topology - (:nimbus (. this state)) name conf topology) - (let [hook (get-configured-class conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)] - (when hook (submit-hook hook name conf topology)))) - - -(defn -submitTopologyWithOpts - [this name conf topology submit-opts] - (submit-local-topology-with-opts - (:nimbus (. this state)) name conf topology submit-opts)) - -(defn -uploadNewCredentials - [this name creds] - (.uploadNewCredentials (:nimbus (. this state)) name creds)) - -(defn -shutdown - [this] - (kill-local-storm-cluster (. this state))) - -(defn -killTopology - [this name] - (.killTopology (:nimbus (. this state)) name)) - -(defn -getTopologyConf - [this id] - (.getTopologyConf (:nimbus (. this state)) id)) - -(defn -getTopology - [this id] - (.getTopology (:nimbus (. this state)) id)) - -(defn -getClusterInfo - [this] - (.getClusterInfo (:nimbus (. this state)))) - -(defn -getTopologyInfo - [this id] - (.getTopologyInfo (:nimbus (. this state)) id)) - -(defn -killTopologyWithOpts - [this name opts] - (.killTopologyWithOpts (:nimbus (. this state)) name opts)) - -(defn -activate - [this name] - (.activate (:nimbus (. this state)) name)) - -(defn -deactivate - [this name] - (.deactivate (:nimbus (. this state)) name)) - -(defn -rebalance - [this name opts] - (.rebalance (:nimbus (. this state)) name opts)) - -(defn -getState - [this] - (.state this)) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/LocalDRPC.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/LocalDRPC.clj b/storm-core/src/clj/backtype/storm/LocalDRPC.clj deleted file mode 100644 index 9773821..0000000 --- a/storm-core/src/clj/backtype/storm/LocalDRPC.clj +++ /dev/null @@ -1,56 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.LocalDRPC - (:require [backtype.storm.daemon [drpc :as drpc]]) - (:use [backtype.storm config util]) - (:import [backtype.storm.utils InprocMessaging ServiceRegistry]) - (:gen-class - :init init - :implements [backtype.storm.ILocalDRPC] - :constructors {[] []} - :state state )) - -(defn -init [] - (let [handler (drpc/service-handler (read-storm-config)) - id (ServiceRegistry/registerService handler) - ] - [[] {:service-id id :handler handler}] - )) - -(defn -execute [this func funcArgs] - (.execute (:handler (. this state)) func funcArgs) - ) - -(defn -result [this id result] - (.result (:handler (. this state)) id result) - ) - -(defn -fetchRequest [this func] - (.fetchRequest (:handler (. this state)) func) - ) - -(defn -failRequest [this id] - (.failRequest (:handler (. this state)) id) - ) - -(defn -getServiceId [this] - (:service-id (. this state))) - -(defn -shutdown [this] - (ServiceRegistry/unregisterService (:service-id (. this state))) - (.shutdown (:handler (. this state))) - ) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/MockAutoCred.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/MockAutoCred.clj b/storm-core/src/clj/backtype/storm/MockAutoCred.clj deleted file mode 100644 index 5e37528..0000000 --- a/storm-core/src/clj/backtype/storm/MockAutoCred.clj +++ /dev/null @@ -1,58 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -;;mock implementation of INimbusCredentialPlugin,IAutoCredentials and ICredentialsRenewer for testing only. -(ns backtype.storm.MockAutoCred - (:use [backtype.storm testing config]) - (:import [backtype.storm.security.INimbusCredentialPlugin] - [backtype.storm.security.auth ICredentialsRenewer]) - (:gen-class - :implements [backtype.storm.security.INimbusCredentialPlugin - backtype.storm.security.auth.IAutoCredentials - backtype.storm.security.auth.ICredentialsRenewer])) - -(def nimbus-cred-key "nimbusCredTestKey") -(def nimbus-cred-val "nimbusTestCred") -(def nimbus-cred-renew-val "renewedNimbusTestCred") -(def gateway-cred-key "gatewayCredTestKey") -(def gateway-cred-val "gatewayTestCred") -(def gateway-cred-renew-val "renewedGatewayTestCred") - -(defn -populateCredentials - ([this creds conf] - (.put creds nimbus-cred-key nimbus-cred-val)) - ([this creds] - (.put creds gateway-cred-key gateway-cred-val))) - -(defn -prepare - [this conf]) - -(defn -renew - [this cred conf] - (.put cred nimbus-cred-key nimbus-cred-renew-val) - (.put cred gateway-cred-key gateway-cred-renew-val)) - -(defn -populateSubject - [subject credentials] - (.add (.getPublicCredentials subject) (.get credentials nimbus-cred-key)) - (.add (.getPublicCredentials subject) (.get credentials gateway-cred-key))) - -(defn -updateSubject - [subject credentials] - (-populateSubject subject credentials)) - - - http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/blobstore.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/blobstore.clj b/storm-core/src/clj/backtype/storm/blobstore.clj deleted file mode 100644 index 936f4b5..0000000 --- a/storm-core/src/clj/backtype/storm/blobstore.clj +++ /dev/null @@ -1,28 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.blobstore - (:import [backtype.storm.utils Utils]) - (:import [backtype.storm.blobstore ClientBlobStore]) - (:use [backtype.storm config])) - -(defmacro with-configured-blob-client - [client-sym & body] - `(let [conf# (read-storm-config) - ^ClientBlobStore ~client-sym (Utils/getClientBlobStore conf#)] - (try - ~@body - (finally (.shutdown ~client-sym))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/clojure.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/clojure.clj b/storm-core/src/clj/backtype/storm/clojure.clj deleted file mode 100644 index a73166a..0000000 --- a/storm-core/src/clj/backtype/storm/clojure.clj +++ /dev/null @@ -1,201 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.clojure - (:use [backtype.storm util]) - (:import [backtype.storm StormSubmitter]) - (:import [backtype.storm.generated StreamInfo]) - (:import [backtype.storm.tuple Tuple]) - (:import [backtype.storm.task OutputCollector IBolt TopologyContext]) - (:import [backtype.storm.spout SpoutOutputCollector ISpout]) - (:import [backtype.storm.utils Utils]) - (:import [backtype.storm.clojure ClojureBolt ClojureSpout]) - (:import [java.util List]) - (:require [backtype.storm [thrift :as thrift]])) - -(defn direct-stream [fields] - (StreamInfo. fields true)) - -(defn to-spec [avar] - (let [m (meta avar)] - [(str (:ns m)) (str (:name m))])) - -(defn clojure-bolt* [output-spec fn-var conf-fn-var args] - (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec))) - -(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args] - `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args)) - -(defn clojure-spout* [output-spec fn-var conf-var args] - (let [m (meta fn-var)] - (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec)) - )) - -(defmacro clojure-spout [output-spec fn-sym conf-sym args] - `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args)) - -(defn normalize-fns [body] - (for [[name args & impl] body - :let [args (-> "this" - gensym - (cons args) - vec)]] - (concat [name args] impl) - )) - -(defmacro bolt [& body] - (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body) - fns (normalize-fns bolt-fns)] - `(reify IBolt - ~@fns - ~@other-fns))) - -(defmacro bolt-execute [& body] - `(bolt - (~'execute ~@body))) - -(defmacro spout [& body] - (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body) - fns (normalize-fns spout-fns)] - `(reify ISpout - ~@fns - ~@other-fns))) - -(defmacro defbolt [name output-spec & [opts & impl :as all]] - (if-not (map? opts) - `(defbolt ~name ~output-spec {} ~@all) - (let [worker-name (symbol (str name "__")) - conf-fn-name (symbol (str name "__conf__")) - params (:params opts) - conf-code (:conf opts) - fn-body (if (:prepare opts) - (cons 'fn impl) - (let [[args & impl-body] impl - coll-sym (nth args 1) - args (vec (take 1 args)) - prepargs [(gensym "conf") (gensym "context") coll-sym]] - `(fn ~prepargs (bolt (~'execute ~args ~@impl-body))))) - definer (if params - `(defn ~name [& args#] - (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#)) - `(def ~name - (clojure-bolt ~output-spec ~worker-name ~conf-fn-name [])) - ) - ] - `(do - (defn ~conf-fn-name ~(if params params []) - ~conf-code - ) - (defn ~worker-name ~(if params params []) - ~fn-body - ) - ~definer - )))) - -(defmacro defspout [name output-spec & [opts & impl :as all]] - (if-not (map? opts) - `(defspout ~name ~output-spec {} ~@all) - (let [worker-name (symbol (str name "__")) - conf-fn-name (symbol (str name "__conf__")) - params (:params opts) - conf-code (:conf opts) - prepare? (:prepare opts) - prepare? (if (nil? prepare?) true prepare?) - fn-body (if prepare? - (cons 'fn impl) - (let [[args & impl-body] impl - coll-sym (first args) - prepargs [(gensym "conf") (gensym "context") coll-sym]] - `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body))))) - definer (if params - `(defn ~name [& args#] - (clojure-spout ~output-spec ~worker-name ~conf-fn-name args#)) - `(def ~name - (clojure-spout ~output-spec ~worker-name ~conf-fn-name [])) - ) - ] - `(do - (defn ~conf-fn-name ~(if params params []) - ~conf-code - ) - (defn ~worker-name ~(if params params []) - ~fn-body - ) - ~definer - )))) - -(defprotocol TupleValues - (tuple-values [values collector stream])) - -(extend-protocol TupleValues - java.util.Map - (tuple-values [this collector ^String stream] - (let [^TopologyContext context (:context collector) - fields (.. context (getThisOutputFields stream) toList) ] - (vec (map (into - (empty this) (for [[k v] this] - [(if (keyword? k) (name k) k) v])) - fields)))) - java.util.List - (tuple-values [this collector stream] - this)) - -(defnk emit-bolt! [collector values - :stream Utils/DEFAULT_STREAM_ID :anchor []] - (let [^List anchor (collectify anchor) - values (tuple-values values collector stream) ] - (.emit ^OutputCollector (:output-collector collector) stream anchor values) - )) - -(defnk emit-direct-bolt! [collector task values - :stream Utils/DEFAULT_STREAM_ID :anchor []] - (let [^List anchor (collectify anchor) - values (tuple-values values collector stream) ] - (.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values) - )) - -(defn ack! [collector ^Tuple tuple] - (.ack ^OutputCollector (:output-collector collector) tuple)) - -(defn fail! [collector ^Tuple tuple] - (.fail ^OutputCollector (:output-collector collector) tuple)) - -(defn report-error! [collector ^Tuple tuple] - (.reportError ^OutputCollector (:output-collector collector) tuple)) - -(defnk emit-spout! [collector values - :stream Utils/DEFAULT_STREAM_ID :id nil] - (let [values (tuple-values values collector stream)] - (.emit ^SpoutOutputCollector (:output-collector collector) stream values id))) - -(defnk emit-direct-spout! [collector task values - :stream Utils/DEFAULT_STREAM_ID :id nil] - (let [values (tuple-values values collector stream)] - (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id))) - -(defalias topology thrift/mk-topology) -(defalias bolt-spec thrift/mk-bolt-spec) -(defalias spout-spec thrift/mk-spout-spec) -(defalias shell-bolt-spec thrift/mk-shell-bolt-spec) -(defalias shell-spout-spec thrift/mk-shell-spout-spec) - -(defn submit-remote-topology [name conf topology] - (StormSubmitter/submitTopology name conf topology)) - -(defn local-cluster [] - ;; do this to avoid a cyclic dependency of - ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster - (eval '(new backtype.storm.LocalCluster))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj deleted file mode 100644 index ebe4955..0000000 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ /dev/null @@ -1,691 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.cluster - (:import [org.apache.zookeeper.data Stat ACL Id] - [backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary - LogConfig ProfileAction ProfileRequest NodeInfo] - [java.io Serializable]) - (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms]) - (:import [org.apache.curator.framework CuratorFramework]) - (:import [backtype.storm.utils Utils]) - (:import [backtype.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState]) - (:import [java.security MessageDigest]) - (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider]) - (:import [backtype.storm.nimbus NimbusInfo]) - (:use [backtype.storm util log config converter]) - (:require [backtype.storm [zookeeper :as zk]]) - (:require [backtype.storm.daemon [common :as common]])) - -(defn mk-topo-only-acls - [topo-conf] - (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)] - (when (Utils/isZkAuthenticationConfiguredTopology topo-conf) - [(first ZooDefs$Ids/CREATOR_ALL_ACL) - (ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))]))) - -(defnk mk-distributed-cluster-state - [conf :auth-conf nil :acls nil :context (ClusterStateContext.)] - (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE) - "backtype.storm.cluster_state.zookeeper_state_factory")) - state-instance (.newInstance clazz)] - (log-debug "Creating cluster state: " (.toString clazz)) - (or (.mkState state-instance conf auth-conf acls context) - nil))) - -(defprotocol StormClusterState - (assignments [this callback]) - (assignment-info [this storm-id callback]) - (assignment-info-with-version [this storm-id callback]) - (assignment-version [this storm-id callback]) - ;returns key information under /storm/blobstore/key - (blobstore-info [this blob-key]) - ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data> - (nimbuses [this]) - ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id - (add-nimbus-host! [this nimbus-id nimbus-summary]) - - (active-storms [this]) - (storm-base [this storm-id callback]) - (get-worker-heartbeat [this storm-id node port]) - (get-worker-profile-requests [this storm-id nodeinfo thrift?]) - (get-topology-profile-requests [this storm-id thrift?]) - (set-worker-profile-request [this storm-id profile-request]) - (delete-topology-profile-requests [this storm-id profile-request]) - (executor-beats [this storm-id executor->node+port]) - (supervisors [this callback]) - (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist - (setup-heartbeats! [this storm-id]) - (teardown-heartbeats! [this storm-id]) - (teardown-topology-errors! [this storm-id]) - (heartbeat-storms [this]) - (error-topologies [this]) - (set-topology-log-config! [this storm-id log-config]) - (topology-log-config [this storm-id cb]) - (worker-heartbeat! [this storm-id node port info]) - (remove-worker-heartbeat! [this storm-id node port]) - (supervisor-heartbeat! [this supervisor-id info]) - (worker-backpressure! [this storm-id node port info]) - (topology-backpressure [this storm-id callback]) - (setup-backpressure! [this storm-id]) - (remove-worker-backpressure! [this storm-id node port]) - (activate-storm! [this storm-id storm-base]) - (update-storm! [this storm-id new-elems]) - (remove-storm-base! [this storm-id]) - (set-assignment! [this storm-id info]) - ;; sets up information related to key consisting of nimbus - ;; host:port and version info of the blob - (setup-blobstore! [this key nimbusInfo versionInfo]) - (active-keys [this]) - (blobstore [this callback]) - (remove-storm! [this storm-id]) - (remove-blobstore-key! [this blob-key]) - (remove-key-version! [this blob-key]) - (report-error [this storm-id component-id node port error]) - (errors [this storm-id component-id]) - (last-error [this storm-id component-id]) - (set-credentials! [this storm-id creds topo-conf]) - (credentials [this storm-id callback]) - (disconnect [this])) - -(def ASSIGNMENTS-ROOT "assignments") -(def CODE-ROOT "code") -(def STORMS-ROOT "storms") -(def SUPERVISORS-ROOT "supervisors") -(def WORKERBEATS-ROOT "workerbeats") -(def BACKPRESSURE-ROOT "backpressure") -(def ERRORS-ROOT "errors") -(def BLOBSTORE-ROOT "blobstore") -; Stores the latest update sequence for a blob -(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber") -(def NIMBUSES-ROOT "nimbuses") -(def CREDENTIALS-ROOT "credentials") -(def LOGCONFIG-ROOT "logconfigs") -(def PROFILERCONFIG-ROOT "profilerconfigs") - -(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) -(def STORMS-SUBTREE (str "/" STORMS-ROOT)) -(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) -(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) -(def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT)) -(def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) -;; Blobstore subtree /storm/blobstore -(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT)) -(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT)) -(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT)) -(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT)) -(def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT)) -(def PROFILERCONFIG-SUBTREE (str "/" PROFILERCONFIG-ROOT)) - -(defn supervisor-path - [id] - (str SUPERVISORS-SUBTREE "/" id)) - -(defn assignment-path - [id] - (str ASSIGNMENTS-SUBTREE "/" id)) - -(defn blobstore-path - [key] - (str BLOBSTORE-SUBTREE "/" key)) - -(defn blobstore-max-key-sequence-number-path - [key] - (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key)) - -(defn nimbus-path - [id] - (str NIMBUSES-SUBTREE "/" id)) - -(defn storm-path - [id] - (str STORMS-SUBTREE "/" id)) - -(defn workerbeat-storm-root - [storm-id] - (str WORKERBEATS-SUBTREE "/" storm-id)) - -(defn workerbeat-path - [storm-id node port] - (str (workerbeat-storm-root storm-id) "/" node "-" port)) - -(defn backpressure-storm-root - [storm-id] - (str BACKPRESSURE-SUBTREE "/" storm-id)) - -(defn backpressure-path - [storm-id node port] - (str (backpressure-storm-root storm-id) "/" node "-" port)) - -(defn error-storm-root - [storm-id] - (str ERRORS-SUBTREE "/" storm-id)) - -(defn error-path - [storm-id component-id] - (str (error-storm-root storm-id) "/" (url-encode component-id))) - -(def last-error-path-seg "last-error") - -(defn last-error-path - [storm-id component-id] - (str (error-storm-root storm-id) - "/" - (url-encode component-id) - "-" - last-error-path-seg)) - -(defn credentials-path - [storm-id] - (str CREDENTIALS-SUBTREE "/" storm-id)) - -(defn log-config-path - [storm-id] - (str LOGCONFIG-SUBTREE "/" storm-id)) - -(defn profiler-config-path - ([storm-id] - (str PROFILERCONFIG-SUBTREE "/" storm-id)) - ([storm-id host port request-type] - (str (profiler-config-path storm-id) "/" host "_" port "_" request-type))) - -(defn- issue-callback! - [cb-atom] - (let [cb @cb-atom] - (reset! cb-atom nil) - (when cb - (cb)))) - -(defn- issue-map-callback! - [cb-atom id] - (let [cb (@cb-atom id)] - (swap! cb-atom dissoc id) - (when cb - (cb id)))) - -(defn- maybe-deserialize - [ser clazz] - (when ser - (Utils/deserialize ser clazz))) - -(defrecord TaskError [error time-secs host port]) - -(defn- parse-error-path - [^String p] - (Long/parseLong (.substring p 1))) - -(defn convert-executor-beats - "Ensures that we only return heartbeats for executors assigned to - this worker." - [executors worker-hb] - (let [executor-stats (:executor-stats worker-hb)] - (->> executors - (map (fn [t] - (if (contains? executor-stats t) - {t {:time-secs (:time-secs worker-hb) - :uptime (:uptime worker-hb) - :stats (get executor-stats t)}}))) - (into {})))) - -;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called. -(defnk mk-storm-cluster-state - [cluster-state-spec :acls nil :context (ClusterStateContext.)] - (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec) - [false cluster-state-spec] - [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)]) - assignment-info-callback (atom {}) - assignment-info-with-version-callback (atom {}) - assignment-version-callback (atom {}) - supervisors-callback (atom nil) - backpressure-callback (atom {}) ;; we want to reigister a topo directory getChildren callback for all workers of this dir - assignments-callback (atom nil) - storm-base-callback (atom {}) - blobstore-callback (atom nil) - credentials-callback (atom {}) - log-config-callback (atom {}) - state-id (.register - cluster-state - (fn [type path] - (let [[subtree & args] (tokenize-path path)] - (condp = subtree - ASSIGNMENTS-ROOT (if (empty? args) - (issue-callback! assignments-callback) - (do - (issue-map-callback! assignment-info-callback (first args)) - (issue-map-callback! assignment-version-callback (first args)) - (issue-map-callback! assignment-info-with-version-callback (first args)))) - SUPERVISORS-ROOT (issue-callback! supervisors-callback) - BLOBSTORE-ROOT (issue-callback! blobstore-callback) ;; callback register for blobstore - STORMS-ROOT (issue-map-callback! storm-base-callback (first args)) - CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args)) - LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args)) - BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args)) - ;; this should never happen - (exit-process! 30 "Unknown callback for subtree " subtree args)))))] - (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE - LOGCONFIG-SUBTREE]] - (.mkdirs cluster-state p acls)) - (reify - StormClusterState - - (assignments - [this callback] - (when callback - (reset! assignments-callback callback)) - (.get_children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) - - (assignment-info - [this storm-id callback] - (when callback - (swap! assignment-info-callback assoc storm-id callback)) - (clojurify-assignment (maybe-deserialize (.get_data cluster-state (assignment-path storm-id) (not-nil? callback)) Assignment))) - - (assignment-info-with-version - [this storm-id callback] - (when callback - (swap! assignment-info-with-version-callback assoc storm-id callback)) - (let [{data :data version :version} - (.get_data_with_version cluster-state (assignment-path storm-id) (not-nil? callback))] - {:data (clojurify-assignment (maybe-deserialize data Assignment)) - :version version})) - - (assignment-version - [this storm-id callback] - (when callback - (swap! assignment-version-callback assoc storm-id callback)) - (.get_version cluster-state (assignment-path storm-id) (not-nil? callback))) - - ;; blobstore state - (blobstore - [this callback] - (when callback - (reset! blobstore-callback callback)) - (.sync_path cluster-state BLOBSTORE-SUBTREE) - (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback))) - - (nimbuses - [this] - (map #(maybe-deserialize (.get_data cluster-state (nimbus-path %1) false) NimbusSummary) - (.get_children cluster-state NIMBUSES-SUBTREE false))) - - (add-nimbus-host! - [this nimbus-id nimbus-summary] - ;explicit delete for ephmeral node to ensure this session creates the entry. - (.delete_node cluster-state (nimbus-path nimbus-id)) - - (.add_listener cluster-state (reify ClusterStateListener - (^void stateChanged[this ^ConnectionState newState] - (log-message "Connection state listener invoked, zookeeper connection state has changed to " newState) - (if (.equals newState ConnectionState/RECONNECTED) - (do - (log-message "Connection state has changed to reconnected so setting nimbuses entry one more time") - (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)))))) - - (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)) - - (setup-blobstore! - [this key nimbusInfo versionInfo] - (let [path (str (blobstore-path key) "/" (.toHostPortString nimbusInfo) "-" versionInfo)] - (log-message "setup-path" path) - (.mkdirs cluster-state (blobstore-path key) acls) - ;we delete the node first to ensure the node gets created as part of this session only. - (.delete_node_blobstore cluster-state (str (blobstore-path key)) (.toHostPortString nimbusInfo)) - (.set_ephemeral_node cluster-state path nil acls))) - - (blobstore-info - [this blob-key] - (let [path (blobstore-path blob-key)] - (.sync_path cluster-state path) - (.get_children cluster-state path false))) - - (active-storms - [this] - (.get_children cluster-state STORMS-SUBTREE false)) - - (active-keys - [this] - (.get_children cluster-state BLOBSTORE-SUBTREE false)) - - (heartbeat-storms - [this] - (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false)) - - (error-topologies - [this] - (.get_children cluster-state ERRORS-SUBTREE false)) - - (get-worker-heartbeat - [this storm-id node port] - (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path storm-id node port) false)] - (if worker-hb - (-> worker-hb - (maybe-deserialize ClusterWorkerHeartbeat) - clojurify-zk-worker-hb)))) - - (executor-beats - [this storm-id executor->node+port] - ;; need to take executor->node+port in explicitly so that we don't run into a situation where a - ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats - ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, - ;; we avoid situations like that - (let [node+port->executors (reverse-map executor->node+port) - all-heartbeats (for [[[node port] executors] node+port->executors] - (->> (get-worker-heartbeat this storm-id node port) - (convert-executor-beats executors) - ))] - (apply merge all-heartbeats))) - - (supervisors - [this callback] - (when callback - (reset! supervisors-callback callback)) - (.get_children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))) - - (supervisor-info - [this supervisor-id] - (clojurify-supervisor-info (maybe-deserialize (.get_data cluster-state (supervisor-path supervisor-id) false) SupervisorInfo))) - - (topology-log-config - [this storm-id cb] - (when cb - (swap! log-config-callback assoc storm-id cb)) - (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) (not-nil? cb)) LogConfig)) - - (set-topology-log-config! - [this storm-id log-config] - (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls)) - - (set-worker-profile-request - [this storm-id profile-request] - (let [request-type (.get_action profile-request) - host (.get_node (.get_nodeInfo profile-request)) - port (first (.get_port (.get_nodeInfo profile-request)))] - (.set_data cluster-state - (profiler-config-path storm-id host port request-type) - (Utils/serialize profile-request) - acls))) - - (get-topology-profile-requests - [this storm-id thrift?] - (let [path (profiler-config-path storm-id) - requests (if (.node_exists cluster-state path false) - (dofor [c (.get_children cluster-state path false)] - (let [raw (.get_data cluster-state (str path "/" c) false) - request (maybe-deserialize raw ProfileRequest)] - (if thrift? - request - (clojurify-profile-request request)))))] - requests)) - - (delete-topology-profile-requests - [this storm-id profile-request] - (let [profile-request-inst (thriftify-profile-request profile-request) - action (:action profile-request) - host (:host profile-request) - port (:port profile-request)] - (.delete_node cluster-state - (profiler-config-path storm-id host port action)))) - - (get-worker-profile-requests - [this storm-id node-info thrift?] - (let [host (:host node-info) - port (:port node-info) - profile-requests (get-topology-profile-requests this storm-id thrift?)] - (if thrift? - (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port (first (.get_port (.get_nodeInfo %))))) - profile-requests) - (filter #(and (= host (:host %)) (= port (:port %))) - profile-requests)))) - - (worker-heartbeat! - [this storm-id node port info] - (let [thrift-worker-hb (thriftify-zk-worker-hb info)] - (if thrift-worker-hb - (.set_worker_hb cluster-state (workerbeat-path storm-id node port) (Utils/serialize thrift-worker-hb) acls)))) - - (remove-worker-heartbeat! - [this storm-id node port] - (.delete_worker_hb cluster-state (workerbeat-path storm-id node port))) - - (setup-heartbeats! - [this storm-id] - (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls)) - - (teardown-heartbeats! - [this storm-id] - (try-cause - (.delete_worker_hb cluster-state (workerbeat-storm-root storm-id)) - (catch KeeperException e - (log-warn-error e "Could not teardown heartbeats for " storm-id)))) - - (worker-backpressure! - [this storm-id node port on?] - "if znode exists and to be not on?, delete; if exists and on?, do nothing; - if not exists and to be on?, create; if not exists and not on?, do nothing" - (let [path (backpressure-path storm-id node port) - existed (.node_exists cluster-state path false)] - (if existed - (if (not on?) - (.delete_node cluster-state path)) ;; delete the znode since the worker is not congested - (if on? - (.set_ephemeral_node cluster-state path nil acls))))) ;; create the znode since worker is congested - - (topology-backpressure - [this storm-id callback] - "if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not." - (when callback - (swap! backpressure-callback assoc storm-id callback)) - (let [path (backpressure-storm-root storm-id) - children (.get_children cluster-state path (not-nil? callback))] - (> (count children) 0))) - - (setup-backpressure! - [this storm-id] - (.mkdirs cluster-state (backpressure-storm-root storm-id) acls)) - - (remove-worker-backpressure! - [this storm-id node port] - (.delete_node cluster-state (backpressure-path storm-id node port))) - - (teardown-topology-errors! - [this storm-id] - (try-cause - (.delete_node cluster-state (error-storm-root storm-id)) - (catch KeeperException e - (log-warn-error e "Could not teardown errors for " storm-id)))) - - (supervisor-heartbeat! - [this supervisor-id info] - (let [thrift-supervisor-info (thriftify-supervisor-info info)] - (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls))) - - (activate-storm! - [this storm-id storm-base] - (let [thrift-storm-base (thriftify-storm-base storm-base)] - (.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls))) - - (update-storm! - [this storm-id new-elems] - (let [base (storm-base this storm-id nil) - executors (:component->executors base) - component->debug (:component->debug base) - new-elems (update new-elems :component->executors (partial merge executors)) - new-elems (update new-elems :component->debug (partial merge-with merge component->debug))] - (.set_data cluster-state (storm-path storm-id) - (-> base - (merge new-elems) - thriftify-storm-base - Utils/serialize) - acls))) - - (storm-base - [this storm-id callback] - (when callback - (swap! storm-base-callback assoc storm-id callback)) - (clojurify-storm-base (maybe-deserialize (.get_data cluster-state (storm-path storm-id) (not-nil? callback)) StormBase))) - - (remove-storm-base! - [this storm-id] - (.delete_node cluster-state (storm-path storm-id))) - - (set-assignment! - [this storm-id info] - (let [thrift-assignment (thriftify-assignment info)] - (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls))) - - (remove-blobstore-key! - [this blob-key] - (log-debug "removing key" blob-key) - (.delete_node cluster-state (blobstore-path blob-key))) - - (remove-key-version! - [this blob-key] - (.delete_node cluster-state (blobstore-max-key-sequence-number-path blob-key))) - - (remove-storm! - [this storm-id] - (.delete_node cluster-state (assignment-path storm-id)) - (.delete_node cluster-state (credentials-path storm-id)) - (.delete_node cluster-state (log-config-path storm-id)) - (.delete_node cluster-state (profiler-config-path storm-id)) - (remove-storm-base! this storm-id)) - - (set-credentials! - [this storm-id creds topo-conf] - (let [topo-acls (mk-topo-only-acls topo-conf) - path (credentials-path storm-id) - thriftified-creds (thriftify-credentials creds)] - (.set_data cluster-state path (Utils/serialize thriftified-creds) topo-acls))) - - (credentials - [this storm-id callback] - (when callback - (swap! credentials-callback assoc storm-id callback)) - (clojurify-crdentials (maybe-deserialize (.get_data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials))) - - (report-error - [this storm-id component-id node port error] - (let [path (error-path storm-id component-id) - last-error-path (last-error-path storm-id component-id) - data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}) - _ (.mkdirs cluster-state path acls) - ser-data (Utils/serialize data) - _ (.mkdirs cluster-state path acls) - _ (.create_sequential cluster-state (str path "/e") ser-data acls) - _ (.set_data cluster-state last-error-path ser-data acls) - to-kill (->> (.get_children cluster-state path false) - (sort-by parse-error-path) - reverse - (drop 10))] - (doseq [k to-kill] - (.delete_node cluster-state (str path "/" k))))) - - (errors - [this storm-id component-id] - (let [path (error-path storm-id component-id) - errors (if (.node_exists cluster-state path false) - (dofor [c (.get_children cluster-state path false)] - (if-let [data (-> (.get_data cluster-state - (str path "/" c) - false) - (maybe-deserialize ErrorInfo) - clojurify-error)] - (map->TaskError data))) - ())] - (->> (filter not-nil? errors) - (sort-by (comp - :time-secs))))) - - (last-error - [this storm-id component-id] - (let [path (last-error-path storm-id component-id)] - (if (.node_exists cluster-state path false) - (if-let [data (-> (.get_data cluster-state path false) - (maybe-deserialize ErrorInfo) - clojurify-error)] - (map->TaskError data))))) - - (disconnect - [this] - (.unregister cluster-state state-id) - (when solo? - (.close cluster-state)))))) - -;; daemons have a single thread that will respond to events -;; start with initialize event -;; callbacks add events to the thread's queue - -;; keeps in memory cache of the state, only for what client subscribes to. Any subscription is automatically kept in sync, and when there are changes, client is notified. -;; master gives orders through state, and client records status in state (ephemerally) - -;; master tells nodes what workers to launch - -;; master writes this. supervisors and workers subscribe to this to understand complete topology. each storm is a map from nodes to workers to tasks to ports whenever topology changes everyone will be notified -;; master includes timestamp of each assignment so that appropriate time can be given to each worker to start up -;; /assignments/{storm id} - -;; which tasks they talk to, etc. (immutable until shutdown) -;; everyone reads this in full to understand structure -;; /tasks/{storm id}/{task id} ; just contains bolt id - -;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously -;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here - -;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously -;; /taskbeats/{storm id}/{ephemeral task id} - -;; contains data about whether it's started or not, tasks and workers subscribe to specific storm here to know when to shutdown -;; master manipulates -;; /storms/{storm id} - -;; Zookeeper flows: - -;; Master: -;; job submit: -;; 1. read which nodes are available -;; 2. set up the worker/{storm}/{task} stuff (static) -;; 3. set assignments -;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off) - -;; Monitoring (or by checking when nodes go down or heartbeats aren't received): -;; 1. read assignment -;; 2. see which tasks/nodes are up -;; 3. make new assignment to fix any problems -;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments) - -;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even - -;; Supervisor: -;; 1. monitor /storms/* and assignments -;; 2. local state about which workers are local -;; 3. when storm is on, check that workers are running locally & start/kill if different than assignments -;; 4. when storm is off, monitor tasks for workers - when they all die or don't hearbeat, kill the process and cleanup - -;; Worker: -;; 1. On startup, start the tasks if the storm is on - -;; Task: -;; 1. monitor assignments, reroute when assignments change -;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off - -;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name) -;; supervisor periodically checks to make sure processes are alive -;; {rootdir}/workers/{storm id}/{worker id} ;; contains pid inside - -;; all tasks in a worker share the same cluster state -;; workers, supervisors, and tasks subscribes to storm to know when it's started or stopped -;; on stopped, master removes records in order (tasks need to subscribe to themselves to see if they disappear) -;; when a master removes a worker, the supervisor should kill it (and escalate to kill -9) -;; on shutdown, tasks subscribe to tasks that send data to them to wait for them to die. when node disappears, they can die http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj deleted file mode 100644 index fa36240..0000000 --- a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj +++ /dev/null @@ -1,161 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns backtype.storm.cluster-state.zookeeper-state-factory - (:import [org.apache.curator.framework.state ConnectionStateListener]) - (:import [org.apache.zookeeper KeeperException$NoNodeException] - [backtype.storm.cluster ClusterState DaemonType]) - (:use [backtype.storm cluster config log util]) - (:require [backtype.storm [zookeeper :as zk]]) - (:gen-class - :implements [backtype.storm.cluster.ClusterStateFactory])) - -(defn -mkState [this conf auth-conf acls context] - (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)] - (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls) - (.close zk)) - (let [callbacks (atom {}) - active (atom true) - zk-writer (zk/mk-client conf - (conf STORM-ZOOKEEPER-SERVERS) - (conf STORM-ZOOKEEPER-PORT) - :auth-conf auth-conf - :root (conf STORM-ZOOKEEPER-ROOT) - :watcher (fn [state type path] - (when @active - (when-not (= :connected state) - (log-warn "Received event " state ":" type ":" path " with disconnected Writer Zookeeper.")) - (when-not (= :none type) - (doseq [callback (vals @callbacks)] - (callback type path)))))) - is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS) - zk-reader (if is-nimbus? - (zk/mk-client conf - (conf STORM-ZOOKEEPER-SERVERS) - (conf STORM-ZOOKEEPER-PORT) - :auth-conf auth-conf - :root (conf STORM-ZOOKEEPER-ROOT) - :watcher (fn [state type path] - (when @active - (when-not (= :connected state) - (log-warn "Received event " state ":" type ":" path " with disconnected Reader Zookeeper.")) - (when-not (= :none type) - (doseq [callback (vals @callbacks)] - (callback type path)))))) - zk-writer)] - (reify - ClusterState - - (register - [this callback] - (let [id (uuid)] - (swap! callbacks assoc id callback) - id)) - - (unregister - [this id] - (swap! callbacks dissoc id)) - - (set-ephemeral-node - [this path data acls] - (zk/mkdirs zk-writer (parent-path path) acls) - (if (zk/exists zk-writer path false) - (try-cause - (zk/set-data zk-writer path data) ; should verify that it's ephemeral - (catch KeeperException$NoNodeException e - (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data") - (zk/create-node zk-writer path data :ephemeral acls))) - (zk/create-node zk-writer path data :ephemeral acls))) - - (create-sequential - [this path data acls] - (zk/create-node zk-writer path data :sequential acls)) - - (set-data - [this path data acls] - ;; note: this does not turn off any existing watches - (if (zk/exists zk-writer path false) - (zk/set-data zk-writer path data) - (do - (zk/mkdirs zk-writer (parent-path path) acls) - (zk/create-node zk-writer path data :persistent acls)))) - - (set-worker-hb - [this path data acls] - (.set_data this path data acls)) - - (delete-node - [this path] - (zk/delete-node zk-writer path)) - - (delete-worker-hb - [this path] - (.delete_node this path)) - - (get-data - [this path watch?] - (zk/get-data zk-reader path watch?)) - - (get-data-with-version - [this path watch?] - (zk/get-data-with-version zk-reader path watch?)) - - (get-version - [this path watch?] - (zk/get-version zk-reader path watch?)) - - (get-worker-hb - [this path watch?] - (.get_data this path watch?)) - - (get-children - [this path watch?] - (zk/get-children zk-reader path watch?)) - - (get-worker-hb-children - [this path watch?] - (.get_children this path watch?)) - - (mkdirs - [this path acls] - (zk/mkdirs zk-writer path acls)) - - (node-exists - [this path watch?] - (zk/exists-node? zk-reader path watch?)) - - (add-listener - [this listener] - (let [curator-listener (reify ConnectionStateListener - (stateChanged - [this client newState] - (.stateChanged listener client newState)))] - (zk/add-listener zk-reader curator-listener))) - - (sync-path - [this path] - (zk/sync-path zk-writer path)) - - (delete-node-blobstore - [this path nimbus-host-port-info] - (zk/delete-node-blobstore zk-writer path nimbus-host-port-info)) - - (close - [this] - (reset! active false) - (.close zk-writer) - (if is-nimbus? - (.close zk-reader)))))) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/activate.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/activate.clj b/storm-core/src/clj/backtype/storm/command/activate.clj deleted file mode 100644 index 500e981..0000000 --- a/storm-core/src/clj/backtype/storm/command/activate.clj +++ /dev/null @@ -1,24 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.activate - (:use [backtype.storm thrift log]) - (:gen-class)) - -(defn -main [name] - (with-configured-nimbus-connection nimbus - (.activate nimbus name) - (log-message "Activated topology: " name) - )) http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/blobstore.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/blobstore.clj b/storm-core/src/clj/backtype/storm/command/blobstore.clj deleted file mode 100644 index ae7f919..0000000 --- a/storm-core/src/clj/backtype/storm/command/blobstore.clj +++ /dev/null @@ -1,162 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.command.blobstore - (:import [java.io InputStream OutputStream] - [backtype.storm.generated SettableBlobMeta AccessControl AuthorizationException - KeyNotFoundException] - [backtype.storm.blobstore BlobStoreAclHandler]) - (:use [backtype.storm config] - [clojure.string :only [split]] - [clojure.tools.cli :only [cli]] - [clojure.java.io :only [copy input-stream output-stream]] - [backtype.storm blobstore log util]) - (:gen-class)) - -(defn update-blob-from-stream - "Update a blob in the blob store from an InputStream" - [key ^InputStream in] - (with-configured-blob-client blobstore - (let [out (.updateBlob blobstore key)] - (try - (copy in out) - (.close out) - (catch Exception e - (log-message e) - (.cancel out) - (throw e)))))) - -(defn create-blob-from-stream - "Create a blob in the blob store from an InputStream" - [key ^InputStream in ^SettableBlobMeta meta] - (with-configured-blob-client blobstore - (let [out (.createBlob blobstore key meta)] - (try - (copy in out) - (.close out) - (catch Exception e - (.cancel out) - (throw e)))))) - -(defn read-blob - "Read a blob in the blob store and write to an OutputStream" - [key ^OutputStream out] - (with-configured-blob-client blobstore - (with-open [in (.getBlob blobstore key)] - (copy in out)))) - -(defn as-access-control - "Convert a parameter to an AccessControl object" - [param] - (BlobStoreAclHandler/parseAccessControl (str param))) - -(defn as-acl - [param] - (map as-access-control (split param #","))) - -(defn access-control-str - [^AccessControl acl] - (BlobStoreAclHandler/accessControlToString acl)) - -(defn read-cli [args] - (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])] - (if file - (with-open [f (output-stream file)] - (read-blob key f)) - (read-blob key System/out)))) - -(defn update-cli [args] - (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])] - (if file - (with-open [f (input-stream file)] - (update-blob-from-stream key f)) - (update-blob-from-stream key System/in)) - (log-message "Successfully updated " key))) - -(defn create-cli [args] - (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil] - ["-a" "--acl" :default [] :parse-fn as-acl] - ["-r" "--replication-factor" :default -1 :parse-fn parse-int]) - meta (doto (SettableBlobMeta. acl) - (.set_replication_factor replication-factor))] - (validate-key-name! key) - (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl))) - (if file - (with-open [f (input-stream file)] - (create-blob-from-stream key f meta)) - (create-blob-from-stream key System/in meta)) - (log-message "Successfully created " key))) - -(defn delete-cli [args] - (with-configured-blob-client blobstore - (doseq [key args] - (.deleteBlob blobstore key) - (log-message "deleted " key)))) - -(defn list-cli [args] - (with-configured-blob-client blobstore - (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)] - (doseq [key keys] - (try - (let [meta (.getBlobMeta blobstore key) - version (.get_version meta) - acl (.get_acl (.get_settable meta))] - (log-message key " " version " " (pr-str (map access-control-str acl)))) - (catch AuthorizationException ae - (if-not (empty? args) (log-error "ACCESS DENIED to key: " key))) - (catch KeyNotFoundException knf - (if-not (empty? args) (log-error key " NOT FOUND")))))))) - -(defn set-acl-cli [args] - (let [[{set-acl :set} [key] _] - (cli args ["-s" "--set" :default [] :parse-fn as-acl])] - (with-configured-blob-client blobstore - (let [meta (.getBlobMeta blobstore key) - acl (.get_acl (.get_settable meta)) - new-acl (if set-acl set-acl acl) - new-meta (SettableBlobMeta. new-acl)] - (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl))) - (.setBlobMeta blobstore key new-meta))))) - -(defn rep-cli [args] - (let [sub-command (first args) - new-args (rest args)] - (with-configured-blob-client blobstore - (condp = sub-command - "--read" (let [key (first new-args) - blob-replication (.getBlobReplication blobstore key)] - (log-message "Current replication factor " blob-replication) - blob-replication) - "--update" (let [[{replication-factor :replication-factor} [key] _] - (cli new-args ["-r" "--replication-factor" :parse-fn parse-int])] - (if (nil? replication-factor) - (throw (RuntimeException. (str "Please set the replication factor"))) - (let [blob-replication (.updateBlobReplication blobstore key replication-factor)] - (log-message "Replication factor is set to " blob-replication) - blob-replication))) - :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command"))))))) - -(defn -main [& args] - (let [command (first args) - new-args (rest args)] - (condp = command - "cat" (read-cli new-args) - "create" (create-cli new-args) - "update" (update-cli new-args) - "delete" (delete-cli new-args) - "list" (list-cli new-args) - "set-acl" (set-acl-cli new-args) - "replication" (rep-cli new-args) - :else (throw (RuntimeException. (str command " is not a supported blobstore command"))))))
