Hi Madhan, The build errors only occur with JDK 1.7, which is why I didn't spot them. Upgrading Storm from 1.0.0 -> 1.0.2 fixes the problem, which suggests that the build errors are due to a bug in Storm. Any objections if I perform this upgrade?
Colm. On Mon, Aug 29, 2016 at 6:47 PM, Madhan Neethiraj < [email protected]> wrote: > Colm, > > Afer this commit, master build fails with the following error. Can you > please review? > > Thanks, > Madhan > > 19276 [Thread-12] ERROR o.a.s.event - Error when processing event > org.apache.storm.generated.AuthorizationException > at > org.apache.storm.blobstore.BlobStoreAclHandler.hasPermissions(BlobStoreAclHandler.java:292) > ~[storm-core-1.0.0.jar:1.0.0] > at > org.apache.storm.blobstore.LocalFsBlobStore.getBlob(LocalFsBlobStore.java:240) > ~[storm-core-1.0.0.jar:1.0.0] > at org.apache.storm.blobstore.BlobStore.readBlobTo(BlobStore.java:271) > ~[storm-core-1.0.0.jar:1.0.0] > at > org.apache.storm.daemon.supervisor$fn__9351$fn__9352.invoke(supervisor.clj:1176) > ~[storm-core-1.0.0.jar:1.0.0] > at > org.apache.storm.daemon.supervisor$fn__9351.invoke(supervisor.clj:1172) > ~[storm-core-1.0.0.jar:1.0.0] > at clojure.lang.MultiFn.invoke(MultiFn.java:243) > ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.supervisor$mk_synchronize_ > supervisor$this__9070$fn__9088.invoke(supervisor.clj:582) > ~[storm-core-1.0.0.jar:1.0.0] > at org.apache.storm.daemon.supervisor$mk_synchronize_ > supervisor$this__9070.invoke(supervisor.clj:581) > ~[storm-core-1.0.0.jar:1.0.0] > at org.apache.storm.event$event_manager$fn__8622.invoke(event.clj:40) > [storm-core-1.0.0.jar:1.0.0] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.7.0_71] > 19277 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl > - backgroundOperationsLoop exiting > 19278 [ProcessThread(sid:0 cport:-1):] INFO > o.a.s.s.o.a.z.s.PrepRequestProcessor > - Processed session termination for sessionid: 0x156d760f67c0010 > 19279 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x156d760f67c0010 > closed > 19279 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut > down > 19279 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO > o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client > /fe80:0:0:0:0:0:0:1%1:56076 which had sessionid 0x156d760f67c0010 > 19279 [main] INFO o.a.s.cluster - setup-path/blobstore/temp2-2- > 1472492282-stormcode.ser/10.22.8.99:6627-1 > 19283 [Thread-12] ERROR o.a.s.util - Halting process: ("Error when > processing an event") > java.lang.RuntimeException: ("Error when processing an event") > at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) > [storm-core-1.0.0.jar:1.0.0] > at clojure.lang.RestFn.invoke(RestFn.java:423) > [clojure-1.7.0.jar:?] > at org.apache.storm.event$event_manager$fn__8622.invoke(event.clj:48) > [storm-core-1.0.0.jar:1.0.0] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.7.0_71] > > Results : > > Tests run: 0, Failures: 0, Errors: 0, Skipped: 0 > > > > > > On 8/29/16, 7:25 AM, "[email protected]" <[email protected]> wrote: > > Repository: incubator-ranger > Updated Branches: > refs/heads/master b15d6fa8a -> 7729373de > > > RANGER-1133 - Add tests for the Storm plugin > > Signed-off-by: Selvamohan Neethiraj <[email protected]> > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo > Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > commit/7729373d > Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > tree/7729373d > Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > diff/7729373d > > Branch: refs/heads/master > Commit: 7729373dea5a560f5cb4b24433d12b25a9dc7519 > Parents: b15d6fa > Author: Colm O hEigeartaigh <[email protected]> > Authored: Tue Aug 2 12:25:55 2016 +0100 > Committer: Selvamohan Neethiraj <[email protected]> > Committed: Mon Aug 29 10:13:59 2016 -0400 > > ---------------------------------------------------------------------- > .../ranger/plugin/util/RangerResourceTrie.java | 2 +- > .../security/listener/SpringEventListener.java | 3 +- > storm-agent/pom.xml | 27 +- > .../storm/RangerAdminClientImpl.java | 84 +++++ > .../storm/StormRangerAuthorizerTest.java | 190 +++++++++++ > .../authorization/storm/WordCounterBolt.java | 66 ++++ > .../ranger/authorization/storm/WordSpout.java | 68 ++++ > .../test/resources/ranger-storm-security.xml | 45 +++ > .../src/test/resources/storm-policies.json | 337 > +++++++++++++++++++ > storm-agent/src/test/resources/storm.yaml | 289 > ++++++++++++++++ > storm-agent/src/test/resources/words.txt | 27 ++ > 11 files changed, 1132 insertions(+), 6 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/agents-common/src/main/java/org/apache/ranger/plugin/util/ > RangerResourceTrie.java > ---------------------------------------------------------------------- > diff --git a/agents-common/src/main/java/ > org/apache/ranger/plugin/util/RangerResourceTrie.java > b/agents-common/src/main/java/org/apache/ranger/plugin/util/ > RangerResourceTrie.java > index 809c07e..a5ffd1a 100644 > --- a/agents-common/src/main/java/org/apache/ranger/plugin/util/ > RangerResourceTrie.java > +++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/ > RangerResourceTrie.java > @@ -59,7 +59,7 @@ public class RangerResourceTrie { > > this.resourceName = resourceDef.getName(); > this.optIgnoreCase = strIgnoreCase != null ? > Boolean.parseBoolean(strIgnoreCase) : false; > - this.optWildcard = strWildcard != null ? > Boolean.parseBoolean(strWildcard) : false;; > + this.optWildcard = strWildcard != null ? > Boolean.parseBoolean(strWildcard) : false; > this.wildcardChars = optWildcard ? DEFAULT_WILDCARD_CHARS : > ""; > this.root = new TrieNode(Character.valueOf(( > char)0)); > > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/security-admin/src/main/java/org/apache/ > ranger/security/listener/SpringEventListener.java > ---------------------------------------------------------------------- > diff --git a/security-admin/src/main/java/org/apache/ranger/ > security/listener/SpringEventListener.java b/security-admin/src/main/ > java/org/apache/ranger/security/listener/SpringEventListener.java > index 5876445..29a35cf 100644 > --- a/security-admin/src/main/java/org/apache/ranger/ > security/listener/SpringEventListener.java > +++ b/security-admin/src/main/java/org/apache/ranger/ > security/listener/SpringEventListener.java > @@ -50,9 +50,8 @@ public class SpringEventListener implements > process((AuthenticationFailureBadCredentialsEvent) event); > } else if (event instanceof AuthenticationFailureDisabledEvent) > { > process((AuthenticationFailureDisabledEvent) event); > - } else { > - // igonre all other events > } > + // igonre all other events > > } catch (Exception e) { > logger.error("Exception in Spring Event Listener.", e); > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/pom.xml > ---------------------------------------------------------------------- > diff --git a/storm-agent/pom.xml b/storm-agent/pom.xml > index b975a92..d49c2fe 100644 > --- a/storm-agent/pom.xml > +++ b/storm-agent/pom.xml > @@ -35,6 +35,12 @@ > <groupId>org.apache.storm</groupId> > <artifactId>storm-core</artifactId> > <version>${storm.version}</version> > + <exclusions> > + <exclusion> > + <groupId>org.slf4j</groupId> > + <artifactId>log4j-over-slf4j</artifactId> > + </exclusion> > + </exclusions> > </dependency> > <dependency> > <groupId>org.apache.ranger</groupId> > @@ -93,11 +99,26 @@ > <groupId>org.apache.httpcomponents</groupId> > <artifactId>httpcore</artifactId> > <version>${httpcomponents. > httpcore.version}</version> > - </dependency> > - <dependency> > + </dependency> > + <dependency> > <groupId>commons-codec</groupId> > <artifactId>commons-codec</artifactId> > <version>${commons.codec.version}</version> > - </dependency> > + </dependency> > + <dependency> > + <groupId>junit</groupId> > + <artifactId>junit</artifactId> > + </dependency> > </dependencies> > + <build> > + <testResources> > + <testResource> > + <directory>src/test/resources</directory> > + <includes> > + <include>**/*</include> > + </includes> > + <filtering>true</filtering> > + </testResource> > + </testResources> > + </build> > </project> > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/RangerAdminClientImpl.java > ---------------------------------------------------------------------- > diff --git a/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/RangerAdminClientImpl.java > b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/ > RangerAdminClientImpl.java > new file mode 100644 > index 0000000..e6c289a > --- /dev/null > +++ b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/ > RangerAdminClientImpl.java > @@ -0,0 +1,84 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed > with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, > Version 2.0 > + * (the "License"); you may not use this file except in compliance > with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.ranger.authorization.storm; > + > +import java.io.File; > +import java.nio.file.FileSystems; > +import java.nio.file.Files; > +import java.util.List; > + > +import org.apache.ranger.admin.client.RangerAdminClient; > +import org.apache.ranger.plugin.util.GrantRevokeRequest; > +import org.apache.ranger.plugin.util.ServicePolicies; > +import org.apache.ranger.plugin.util.ServiceTags; > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import com.google.gson.Gson; > +import com.google.gson.GsonBuilder; > + > +/** > + * A test implementation of the RangerAdminClient interface that just > reads policies in from a file and returns them > + */ > +public class RangerAdminClientImpl implements RangerAdminClient { > + private static final Logger LOG = LoggerFactory.getLogger( > RangerAdminClientImpl.class); > + private final static String cacheFilename = "storm-policies.json"; > + private Gson gson; > + > + public void init(String serviceName, String appId, String > configPropertyPrefix) { > + Gson gson = null; > + try { > + gson = new GsonBuilder().setDateFormat(" > yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create(); > + } catch(Throwable excp) { > + LOG.error("RangerAdminClientImpl: failed to create > GsonBuilder object", excp); > + } > + this.gson = gson; > + } > + > + public ServicePolicies getServicePoliciesIfUpdated(long > lastKnownVersion) throws Exception { > + > + String basedir = System.getProperty("basedir"); > + if (basedir == null) { > + basedir = new File(".").getCanonicalPath(); > + } > + > + java.nio.file.Path cachePath = > FileSystems.getDefault().getPath(basedir, > "/src/test/resources/" + cacheFilename); > + byte[] cacheBytes = Files.readAllBytes(cachePath); > + > + return gson.fromJson(new String(cacheBytes), > ServicePolicies.class); > + } > + > + public void grantAccess(GrantRevokeRequest request) throws > Exception { > + > + } > + > + public void revokeAccess(GrantRevokeRequest request) throws > Exception { > + > + } > + > + public ServiceTags getServiceTagsIfUpdated(long lastKnownVersion) > throws Exception { > + return null; > + > + } > + > + public List<String> getTagTypes(String tagTypePattern) throws > Exception { > + return null; > + } > + > + > +} > \ No newline at end of file > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/StormRangerAuthorizerTest.java > ---------------------------------------------------------------------- > diff --git a/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/StormRangerAuthorizerTest.java > b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/ > StormRangerAuthorizerTest.java > new file mode 100644 > index 0000000..0c249c5 > --- /dev/null > +++ b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/ > StormRangerAuthorizerTest.java > @@ -0,0 +1,190 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed > with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, > Version 2.0 > + * (the "License"); you may not use this file except in compliance > with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.ranger.authorization.storm; > + > +import java.security.Principal; > +import java.security.PrivilegedExceptionAction; > + > +import javax.security.auth.Subject; > + > +import org.apache.storm.Config; > +import org.apache.storm.LocalCluster; > +import org.apache.storm.generated.RebalanceOptions; > +import org.apache.storm.topology.TopologyBuilder; > +import org.junit.Assert; > + > +/** > + * A simple test that wires a WordSpout + WordCounterBolt into a > topology and runs it. The "RangerStormAuthorizer" takes care of > authorization. > + * The policies state that "bob" can do anything with the > "word-count" topology. In addition, "bob" can create/kill the "temp*" > topologies, but do > + * nothing else. > + */ > +public class StormRangerAuthorizerTest { > + > + private static LocalCluster cluster; > + > + @org.junit.BeforeClass > + public static void setup() throws Exception { > + cluster = new LocalCluster(); > + > + final Config conf = new Config(); > + conf.setDebug(true); > + > + final TopologyBuilder builder = new TopologyBuilder(); > + builder.setSpout("words", new WordSpout()); > + builder.setBolt("counter", new WordCounterBolt()). > shuffleGrouping("words"); > + > + // bob can create a new topology > + final Subject subject = new Subject(); > + subject.getPrincipals().add(new SimplePrincipal("bob")); > + Subject.doAs(subject, new PrivilegedExceptionAction<Void>() { > + public Void run() throws Exception { > + cluster.submitTopology("word-count", conf, > builder.createTopology()); > + return null; > + } > + }); > + > + } > + > + @org.junit.AfterClass > + public static void cleanup() throws Exception { > + final Subject subject = new Subject(); > + subject.getPrincipals().add(new SimplePrincipal("bob")); > + Subject.doAs(subject, new PrivilegedExceptionAction<Void>() { > + public Void run() throws Exception { > + cluster.killTopology("word-count"); > + return null; > + } > + }); > + > + cluster.shutdown(); > + System.clearProperty("storm.conf.file"); > + } > + > + // "bob" can't create topologies other than "word-count" and > "temp*" > + @org.junit.Test > + public void testCreateTopologyBob() throws Exception { > + final Config conf = new Config(); > + conf.setDebug(true); > + > + final TopologyBuilder builder = new TopologyBuilder(); > + builder.setSpout("words", new WordSpout()); > + builder.setBolt("counter", new WordCounterBolt()). > shuffleGrouping("words"); > + > + final Subject subject = new Subject(); > + subject.getPrincipals().add(new SimplePrincipal("bob")); > + Subject.doAs(subject, new PrivilegedExceptionAction<Void>() { > + public Void run() throws Exception { > + try { > + cluster.submitTopology("word-count2", conf, > builder.createTopology()); > + Assert.fail("Authorization failure expected"); > + } catch (Throwable ex) { > + // expected > + } > + > + return null; > + } > + }); > + } > + > + @org.junit.Test > + public void testTopologyActivation() throws Exception { > + final Subject subject = new Subject(); > + subject.getPrincipals().add(new SimplePrincipal("bob")); > + Subject.doAs(subject, new PrivilegedExceptionAction<Void>() { > + public Void run() throws Exception { > + > + // Deactivate "word-count" > + cluster.deactivate("word-count"); > + > + // Create a new topology called "temp1" > + final Config conf = new Config(); > + conf.setDebug(true); > + > + final TopologyBuilder builder = new TopologyBuilder(); > + builder.setSpout("words", new WordSpout()); > + builder.setBolt("counter", new WordCounterBolt()). > shuffleGrouping("words"); > + cluster.submitTopology("temp1", conf, > builder.createTopology()); > + > + // Try to deactivate "temp1" > + try { > + cluster.deactivate("temp1"); > + Assert.fail("Authorization failure expected"); > + } catch (Throwable ex) { > + // expected > + } > + > + // Re-activate "word-count" > + cluster.activate("word-count"); > + > + // Kill temp1 > + cluster.killTopology("temp1"); > + > + return null; > + } > + }); > + } > + > + @org.junit.Test > + public void testTopologyRebalancing() throws Exception { > + final Subject subject = new Subject(); > + subject.getPrincipals().add(new SimplePrincipal("bob")); > + Subject.doAs(subject, new PrivilegedExceptionAction<Void>() { > + public Void run() throws Exception { > + RebalanceOptions options = new RebalanceOptions(); > + > + // Create a new topology called "temp2" > + final Config conf = new Config(); > + conf.setDebug(true); > + > + final TopologyBuilder builder = new TopologyBuilder(); > + builder.setSpout("words", new WordSpout()); > + builder.setBolt("counter", new WordCounterBolt()). > shuffleGrouping("words"); > + cluster.submitTopology("temp2", conf, > builder.createTopology()); > + > + // Try to rebalance "temp2" > + try { > + cluster.rebalance("temp2", options); > + Assert.fail("Authorization failure expected"); > + } catch (Throwable ex) { > + // expected > + } > + > + // Kill temp2 > + cluster.killTopology("temp2"); > + > + return null; > + } > + }); > + } > + > + > + private static class SimplePrincipal implements Principal { > + > + private final String name; > + > + public SimplePrincipal(String name) { > + this.name = name; > + } > + > + @Override > + public String getName() { > + return name; > + } > + > + } > +} > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/WordCounterBolt.java > ---------------------------------------------------------------------- > diff --git a/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/WordCounterBolt.java b/storm-agent/src/test/java/ > org/apache/ranger/authorization/storm/WordCounterBolt.java > new file mode 100644 > index 0000000..0e327c7 > --- /dev/null > +++ b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/ > WordCounterBolt.java > @@ -0,0 +1,66 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed > with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, > Version 2.0 > + * (the "License"); you may not use this file except in compliance > with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.ranger.authorization.storm; > + > +import java.util.HashMap; > +import java.util.Map; > + > +import org.apache.storm.task.OutputCollector; > +import org.apache.storm.task.TopologyContext; > +import org.apache.storm.topology.OutputFieldsDeclarer; > +import org.apache.storm.topology.base.BaseRichBolt; > +import org.apache.storm.tuple.Fields; > +import org.apache.storm.tuple.Tuple; > +import org.apache.storm.tuple.Values; > + > +/** > + * A Storm Bolt which reads in a word and counts it + outputs the > word + current count > + */ > +public class WordCounterBolt extends BaseRichBolt { > + private OutputCollector outputCollector; > + private Map<String, Integer> countMap = new HashMap<>(); > + > + @Override > + public void execute(Tuple tuple) { > + String word = tuple.getString(0); > + > + int count = 0; > + if (countMap.containsKey(word)) { > + count = countMap.get(word); > + count++; > + } > + count++; > + countMap.put(word, count); > + > + outputCollector.emit(new Values(word, count)); > + outputCollector.ack(tuple); > + > + } > + > + @Override > + public void prepare(Map arg0, TopologyContext arg1, > OutputCollector outputCollector) { > + this.outputCollector = outputCollector; > + } > + > + @Override > + public void declareOutputFields(OutputFieldsDeclarer declarer) { > + declarer.declare(new Fields("word", "count")); > + } > + > + > +} > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/WordSpout.java > ---------------------------------------------------------------------- > diff --git a/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/WordSpout.java b/storm-agent/src/test/java/ > org/apache/ranger/authorization/storm/WordSpout.java > new file mode 100644 > index 0000000..5f0b2cf > --- /dev/null > +++ b/storm-agent/src/test/java/org/apache/ranger/ > authorization/storm/WordSpout.java > @@ -0,0 +1,68 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed > with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, > Version 2.0 > + * (the "License"); you may not use this file except in compliance > with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.ranger.authorization.storm; > + > +import java.util.List; > +import java.util.Map; > + > +import org.apache.storm.shade.org.apache.commons.io.IOUtils; > +import org.apache.storm.spout.SpoutOutputCollector; > +import org.apache.storm.task.TopologyContext; > +import org.apache.storm.topology.OutputFieldsDeclarer; > +import org.apache.storm.topology.base.BaseRichSpout; > +import org.apache.storm.tuple.Fields; > +import org.apache.storm.tuple.Values; > + > +/** > + * A Storm Spout which reads in words.txt + emits a word from it > (sequentially) > + */ > +public class WordSpout extends BaseRichSpout { > + private final List<String> words; > + private SpoutOutputCollector collector; > + private int line = 0; > + > + public WordSpout() throws Exception { > + java.io.File inputFile = new java.io.File(WordSpout.class. > getResource("../../../../../words.txt").toURI()); > + words = IOUtils.readLines(new java.io.FileInputStream( > inputFile)); > + } > + > + @Override > + public void nextTuple() { > + if (line < words.size()) { > + String lineVal = words.get(line++); > + while (lineVal.startsWith("#") && line < words.size()) { > + lineVal = words.get(line++); > + } > + if (lineVal != null) { > + collector.emit(new Values(lineVal.trim())); > + } > + } > + } > + > + @Override > + public void open(Map arg0, TopologyContext arg1, > SpoutOutputCollector collector) { > + this.collector = collector; > + } > + > + @Override > + public void declareOutputFields(OutputFieldsDeclarer declarer) { > + declarer.declare(new Fields("word")); > + } > + > + > +} > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/src/test/resources/ranger-storm-security.xml > ---------------------------------------------------------------------- > diff --git a/storm-agent/src/test/resources/ranger-storm-security.xml > b/storm-agent/src/test/resources/ranger-storm-security.xml > new file mode 100644 > index 0000000..adff2b9 > --- /dev/null > +++ b/storm-agent/src/test/resources/ranger-storm-security.xml > @@ -0,0 +1,45 @@ > +<?xml version="1.0"?> > +<!-- > + Licensed to the Apache Software Foundation (ASF) under one or more > + contributor license agreements. See the NOTICE file distributed > with > + this work for additional information regarding copyright ownership. > + The ASF licenses this file to You under the Apache License, Version > 2.0 > + (the "License"); you may not use this file except in compliance with > + the License. You may obtain a copy of the License at > + > + http://www.apache.org/licenses/LICENSE-2.0 > + > + Unless required by applicable law or agreed to in writing, software > + distributed under the License is distributed on an "AS IS" BASIS, > + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + See the License for the specific language governing permissions and > + limitations under the License. > +--> > +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> > +<configuration xmlns:xi="http://www.w3.org/2001/XInclude"> > + > + <property> > + <name>ranger.plugin.storm.service.name</name> > + <value>StormTest</value> > + <description> > + Name of the Ranger service containing policies for > this SampleApp instance > + </description> > + </property> > + > + <property> > + <name>ranger.plugin.storm.policy.source.impl</name> > + <value>org.apache.ranger.authorization.storm. > RangerAdminClientImpl</value> > + <description> > + Policy source. > + </description> > + </property> > + > + <property> > + <name>ranger.plugin.storm.policy.cache.dir</name> > + <value>${project.build.directory}</value> > + <description> > + Directory where Ranger policies are cached after > successful retrieval from the source > + </description> > + </property> > + > +</configuration> > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/src/test/resources/storm-policies.json > ---------------------------------------------------------------------- > diff --git a/storm-agent/src/test/resources/storm-policies.json > b/storm-agent/src/test/resources/storm-policies.json > new file mode 100644 > index 0000000..5c04b5d > --- /dev/null > +++ b/storm-agent/src/test/resources/storm-policies.json > @@ -0,0 +1,337 @@ > +{ > + "serviceName": "StormTest", > + "serviceId": 10, > + "policyVersion": 12, > + "policyUpdateTime": "20160704-15:53:20.000-+0100", > + "policies": [ > + { > + "service": "StormTest", > + "name": "WordCount", > + "description": "", > + "resourceSignature": "25dc26943b5859a6e5f904388cd02830", > + "isAuditEnabled": true, > + "resources": { > + "topology": { > + "values": [ > + "word-count" > + ], > + "isExcludes": false, > + "isRecursive": false > + } > + }, > + "policyItems": [ > + { > + "accesses": [ > + { > + "type": "submitTopology", > + "isAllowed": true > + }, > + { > + "type": "fileUpload", > + "isAllowed": true > + }, > + { > + "type": "getNimbusConf", > + "isAllowed": true > + }, > + { > + "type": "getClusterInfo", > + "isAllowed": true > + }, > + { > + "type": "fileDownload", > + "isAllowed": true > + }, > + { > + "type": "killTopology", > + "isAllowed": true > + }, > + { > + "type": "rebalance", > + "isAllowed": true > + }, > + { > + "type": "activate", > + "isAllowed": true > + }, > + { > + "type": "deactivate", > + "isAllowed": true > + }, > + { > + "type": "getTopologyConf", > + "isAllowed": true > + }, > + { > + "type": "getTopology", > + "isAllowed": true > + }, > + { > + "type": "getUserTopology", > + "isAllowed": true > + }, > + { > + "type": "getTopologyInfo", > + "isAllowed": true > + }, > + { > + "type": "uploadNewCredentials", > + "isAllowed": true > + } > + ], > + "users": [ > + "bob" > + ], > + "groups": [], > + "conditions": [], > + "delegateAdmin": false > + } > + ], > + "denyPolicyItems": [], > + "allowExceptions": [], > + "denyExceptions": [], > + "dataMaskPolicyItems": [], > + "rowFilterPolicyItems": [], > + "id": 40, > + "guid": "1467386247700_275_2404", > + "isEnabled": true, > + "createdBy": "Admin", > + "updatedBy": "Admin", > + "createTime": "20160701-16:17:27.000-+0100", > + "updateTime": "20160704-14:51:01.000-+0100", > + "version": 6 > + }, > + { > + "service": "StormTest", > + "name": "TempPolicy", > + "description": "", > + "resourceSignature": "1e4cafdc98da3cec11b565ef03cfab14", > + "isAuditEnabled": true, > + "resources": { > + "topology": { > + "values": [ > + "temp*" > + ], > + "isExcludes": false, > + "isRecursive": false > + } > + }, > + "policyItems": [ > + { > + "accesses": [ > + { > + "type": "submitTopology", > + "isAllowed": true > + }, > + { > + "type": "getClusterInfo", > + "isAllowed": true > + }, > + { > + "type": "killTopology", > + "isAllowed": true > + } > + ], > + "users": [ > + "bob" > + ], > + "groups": [], > + "conditions": [], > + "delegateAdmin": false > + } > + ], > + "denyPolicyItems": [], > + "allowExceptions": [], > + "denyExceptions": [], > + "dataMaskPolicyItems": [], > + "rowFilterPolicyItems": [], > + "id": 42, > + "guid": "1467641649473_569_2619", > + "isEnabled": true, > + "createdBy": "Admin", > + "updatedBy": "Admin", > + "createTime": "20160704-15:14:09.000-+0100", > + "updateTime": "20160704-15:53:20.000-+0100", > + "version": 5 > + } > + ], > + "serviceDef": { > + "name": "storm", > + "implClass": "org.apache.ranger.services. > storm.RangerServiceStorm", > + "label": "Storm", > + "description": "Storm", > + "options": {}, > + "configs": [ > + { > + "itemId": 1, > + "name": "username", > + "type": "string", > + "mandatory": true, > + "validationRegEx": "", > + "validationMessage": "", > + "uiHint": "", > + "label": "Username" > + }, > + { > + "itemId": 2, > + "name": "password", > + "type": "password", > + "mandatory": true, > + "validationRegEx": "", > + "validationMessage": "", > + "uiHint": "", > + "label": "Password" > + }, > + { > + "itemId": 3, > + "name": "nimbus.url", > + "type": "string", > + "mandatory": true, > + "defaultValue": "", > + "validationRegEx": "", > + "validationMessage": "", > + "uiHint": "", > + "label": "Nimbus URL" > + }, > + { > + "itemId": 4, > + "name": "commonNameForCertificate", > + "type": "string", > + "mandatory": false, > + "validationRegEx": "", > + "validationMessage": "", > + "uiHint": "", > + "label": "Common Name for Certificate" > + } > + ], > + "resources": [ > + { > + "itemId": 1, > + "name": "topology", > + "type": "string", > + "level": 10, > + "mandatory": true, > + "lookupSupported": true, > + "recursiveSupported": false, > + "excludesSupported": true, > + "matcher": "org.apache.ranger.plugin.resourcematcher. > RangerDefaultResourceMatcher", > + "matcherOptions": { > + "wildCard": "true", > + "ignoreCase": "false" > + }, > + "validationRegEx": "", > + "validationMessage": "", > + "uiHint": "", > + "label": "Storm Topology", > + "description": "Storm Topology" > + } > + ], > + "accessTypes": [ > + { > + "itemId": 1, > + "name": "submitTopology", > + "label": "Submit Topology", > + "impliedGrants": [ > + "fileUpload", > + "fileDownload" > + ] > + }, > + { > + "itemId": 2, > + "name": "fileUpload", > + "label": "File Upload", > + "impliedGrants": [] > + }, > + { > + "itemId": 3, > + "name": "getNimbusConf", > + "label": "Get Nimbus Conf", > + "impliedGrants": [] > + }, > + { > + "itemId": 4, > + "name": "getClusterInfo", > + "label": "Get Cluster Info", > + "impliedGrants": [] > + }, > + { > + "itemId": 5, > + "name": "fileDownload", > + "label": "File Download", > + "impliedGrants": [] > + }, > + { > + "itemId": 6, > + "name": "killTopology", > + "label": "Kill Topology", > + "impliedGrants": [] > + }, > + { > + "itemId": 7, > + "name": "rebalance", > + "label": "Rebalance", > + "impliedGrants": [] > + }, > + { > + "itemId": 8, > + "name": "activate", > + "label": "Activate", > + "impliedGrants": [] > + }, > + { > + "itemId": 9, > + "name": "deactivate", > + "label": "Deactivate", > + "impliedGrants": [] > + }, > + { > + "itemId": 10, > + "name": "getTopologyConf", > + "label": "Get Topology Conf", > + "impliedGrants": [] > + }, > + { > + "itemId": 11, > + "name": "getTopology", > + "label": "Get Topology", > + "impliedGrants": [] > + }, > + { > + "itemId": 12, > + "name": "getUserTopology", > + "label": "Get User Topology", > + "impliedGrants": [] > + }, > + { > + "itemId": 13, > + "name": "getTopologyInfo", > + "label": "Get Topology Info", > + "impliedGrants": [] > + }, > + { > + "itemId": 14, > + "name": "uploadNewCredentials", > + "label": "Upload New Credential", > + "impliedGrants": [] > + } > + ], > + "policyConditions": [], > + "contextEnrichers": [], > + "enums": [], > + "dataMaskDef": { > + "maskTypes": [], > + "accessTypes": [], > + "resources": [] > + }, > + "rowFilterDef": { > + "accessTypes": [], > + "resources": [] > + }, > + "id": 6, > + "guid": "2a60f427-edcf-4e20-834c-a9a267b5b963", > + "isEnabled": true, > + "createTime": "20160314-14:39:35.000-+0000", > + "updateTime": "20160314-14:39:35.000-+0000", > + "version": 1 > + } > +} > \ No newline at end of file > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/src/test/resources/storm.yaml > ---------------------------------------------------------------------- > diff --git a/storm-agent/src/test/resources/storm.yaml > b/storm-agent/src/test/resources/storm.yaml > new file mode 100644 > index 0000000..a306966 > --- /dev/null > +++ b/storm-agent/src/test/resources/storm.yaml > @@ -0,0 +1,289 @@ > +# Licensed to the Apache Software Foundation (ASF) under one > +# or more contributor license agreements. See the NOTICE file > +# distributed with this work for additional information > +# regarding copyright ownership. The ASF licenses this file > +# to you under the Apache License, Version 2.0 (the > +# "License"); you may not use this file except in compliance > +# with the License. You may obtain a copy of the License at > +# > +# http://www.apache.org/licenses/LICENSE-2.0 > +# > +# Unless required by applicable law or agreed to in writing, software > +# distributed under the License is distributed on an "AS IS" BASIS, > +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > +# See the License for the specific language governing permissions and > +# limitations under the License. > + > + > +########### These all have default values as shown > +########### Additional configuration goes into storm.yaml > + > +java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib" > + > +### storm.* configs are general configurations > +# the local dir is where jars are kept > +storm.local.dir: "storm-local" > +storm.log4j2.conf.dir: "log4j2" > +storm.zookeeper.servers: > + - "localhost" > +storm.zookeeper.port: 2181 > +storm.zookeeper.root: "/storm" > +storm.zookeeper.session.timeout: 20000 > +storm.zookeeper.connection.timeout: 15000 > +storm.zookeeper.retry.times: 5 > +storm.zookeeper.retry.interval: 1000 > +storm.zookeeper.retry.intervalceiling.millis: 30000 > +storm.zookeeper.auth.user: null > +storm.zookeeper.auth.password: null > +storm.exhibitor.port: 8080 > +storm.exhibitor.poll.uripath: "/exhibitor/v1/cluster/list" > +storm.cluster.mode: "distributed" # can be distributed or local > +storm.local.mode.zmq: false > +storm.thrift.transport: "org.apache.storm.security. > auth.SimpleTransportPlugin" > +storm.principal.tolocal: "org.apache.storm.security. > auth.DefaultPrincipalToLocal" > +storm.group.mapping.service: "org.apache.storm.security. > auth.ShellBasedGroupsMapping" > +storm.group.mapping.service.params: null > +storm.messaging.transport: "org.apache.storm.messaging.netty.Context" > +storm.nimbus.retry.times: 5 > +storm.nimbus.retry.interval.millis: 2000 > +storm.nimbus.retry.intervalceiling.millis: 60000 > +storm.auth.simple-white-list.users: [] > +storm.auth.simple-acl.users: [] > +storm.auth.simple-acl.users.commands: [] > +storm.auth.simple-acl.admins: [] > +storm.cluster.state.store: "org.apache.storm.cluster_ > state.zookeeper_state_factory" > +storm.meta.serialization.delegate: "org.apache.storm.serialization. > GzipThriftSerializationDelegate" > +storm.codedistributor.class: "org.apache.storm.codedistributor. > LocalFileSystemCodeDistributor" > +storm.workers.artifacts.dir: "workers-artifacts" > +storm.health.check.dir: "healthchecks" > +storm.health.check.timeout.ms: 5000 > + > +### nimbus.* configs are for the master > +nimbus.seeds : ["localhost"] > +nimbus.thrift.port: 6627 > +nimbus.thrift.threads: 64 > +nimbus.thrift.max_buffer_size: 1048576 > +nimbus.childopts: "-Xmx1024m" > +nimbus.task.timeout.secs: 30 > +nimbus.supervisor.timeout.secs: 60 > +nimbus.monitor.freq.secs: 10 > +nimbus.cleanup.inbox.freq.secs: 600 > +nimbus.inbox.jar.expiration.secs: 3600 > +nimbus.code.sync.freq.secs: 120 > +nimbus.task.launch.secs: 120 > +nimbus.file.copy.expiration.secs: 600 > +nimbus.topology.validator: "org.apache.storm.nimbus. > DefaultTopologyValidator" > +topology.min.replication.count: 1 > +topology.max.replication.wait.time.sec: 60 > +nimbus.credential.renewers.freq.secs: 600 > +nimbus.impersonation.authorizer: "org.apache.storm.security. > auth.authorizer.ImpersonationAuthorizer" > +# Plug in ranger nimbus.authorizer here > +nimbus.authorizer: "org.apache.ranger.authorization.storm.authorizer. > RangerStormAuthorizer" > +nimbus.queue.size: 100000 > +scheduler.display.resource: false > + > +### ui.* configs are for the master > +ui.host: 0.0.0.0 > +ui.port: 8080 > +ui.childopts: "-Xmx768m" > +ui.actions.enabled: true > +ui.filter: null > +ui.filter.params: null > +ui.users: null > +ui.header.buffer.bytes: 4096 > +ui.http.creds.plugin: org.apache.storm.security.auth. > DefaultHttpCredentialsPlugin > + > +logviewer.port: 8000 > +logviewer.childopts: "-Xmx128m" > +logviewer.cleanup.age.mins: 10080 > +logviewer.appender.name: "A1" > +logviewer.max.sum.worker.logs.size.mb: 4096 > +logviewer.max.per.worker.logs.size.mb: 2048 > + > +logs.users: null > + > +drpc.port: 3772 > +drpc.worker.threads: 64 > +drpc.max_buffer_size: 1048576 > +drpc.queue.size: 128 > +drpc.invocations.port: 3773 > +drpc.invocations.threads: 64 > +drpc.request.timeout.secs: 600 > +drpc.childopts: "-Xmx768m" > +drpc.http.port: 3774 > +drpc.https.port: -1 > +drpc.https.keystore.password: "" > +drpc.https.keystore.type: "JKS" > +drpc.http.creds.plugin: org.apache.storm.security.auth. > DefaultHttpCredentialsPlugin > +drpc.authorizer.acl.filename: "drpc-auth-acl.yaml" > +drpc.authorizer.acl.strict: false > + > +transactional.zookeeper.root: "/transactional" > +transactional.zookeeper.servers: null > +transactional.zookeeper.port: null > + > +## blobstore configs > +supervisor.blobstore.class: "org.apache.storm.blobstore. > NimbusBlobStore" > +supervisor.blobstore.download.thread.count: 5 > +supervisor.blobstore.download.max_retries: 3 > +supervisor.localizer.cache.target.size.mb: 10240 > +supervisor.localizer.cleanup.interval.ms: 600000 > + > +nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore" > +nimbus.blobstore.expiration.secs: 600 > + > +storm.blobstore.inputstream.buffer.size.bytes: 65536 > +client.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore" > +storm.blobstore.replication.factor: 3 > + > +### supervisor.* configs are for node supervisors > +# Define the amount of workers that can be run on this machine. Each > worker is assigned a port to use for communication > +supervisor.slots.ports: > + - 6700 > + - 6701 > + - 6702 > + - 6703 > +supervisor.childopts: "-Xmx256m" > +supervisor.run.worker.as.user: false > +#how long supervisor will wait to ensure that a worker process is > started > +supervisor.worker.start.timeout.secs: 120 > +#how long between heartbeats until supervisor considers that worker > dead and tries to restart it > +supervisor.worker.timeout.secs: 30 > +#how many seconds to sleep for before shutting down threads on worker > +supervisor.worker.shutdown.sleep.secs: 1 > +#how frequently the supervisor checks on the status of the processes > it's monitoring and restarts if necessary > +supervisor.monitor.frequency.secs: 3 > +#how frequently the supervisor heartbeats to the cluster state (for > nimbus) > +supervisor.heartbeat.frequency.secs: 5 > +supervisor.enable: true > +supervisor.supervisors: [] > +supervisor.supervisors.commands: [] > +supervisor.memory.capacity.mb: 3072.0 > +#By convention 1 cpu core should be about 100, but this can be > adjusted if needed > +# using 100 makes it simple to set the desired value to the capacity > measurement > +# for single threaded bolts > +supervisor.cpu.capacity: 400.0 > + > +### worker.* configs are for task workers > +worker.heap.memory.mb: 768 > +worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails > -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M > -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump" > +worker.gc.childopts: "" > + > +# Unlocking commercial features requires a special license from > Oracle. > +# See http://www.oracle.com/technetwork/java/javase/terms/ > products/index.html > +# For this reason, profiler features are disabled by default. > +worker.profiler.enabled: false > +worker.profiler.childopts: "-XX:+UnlockCommercialFeatures > -XX:+FlightRecorder" > +worker.profiler.command: "flight.bash" > +worker.heartbeat.frequency.secs: 1 > + > +# check whether dynamic log levels can be reset from DEBUG to INFO in > workers > +worker.log.level.reset.poll.secs: 30 > + > +# control how many worker receiver threads we need per worker > +topology.worker.receiver.thread.count: 1 > + > +task.heartbeat.frequency.secs: 3 > +task.refresh.poll.secs: 10 > +task.credentials.poll.secs: 30 > + > +# now should be null by default > +topology.backpressure.enable: true > +backpressure.disruptor.high.watermark: 0.9 > +backpressure.disruptor.low.watermark: 0.4 > + > +zmq.threads: 1 > +zmq.linger.millis: 5000 > +zmq.hwm: 0 > + > + > +storm.messaging.netty.server_worker_threads: 1 > +storm.messaging.netty.client_worker_threads: 1 > +storm.messaging.netty.buffer_size: 5242880 #5MB buffer > +# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs > are 120, other workers should also wait at least that long before giving up > on connecting to the other worker. The reconnection period need also be > bigger than storm.zookeeper.session.timeout(default is 20s), so that we > can abort the reconnection when the target worker is dead. > +storm.messaging.netty.max_retries: 300 > +storm.messaging.netty.max_wait_ms: 1000 > +storm.messaging.netty.min_wait_ms: 100 > + > +# If the Netty messaging layer is busy(netty internal buffer not > writable), the Netty client will try to batch message as more as possible > up to the size of storm.messaging.netty.transfer.batch.size bytes, > otherwise it will try to flush message as soon as possible to reduce > latency. > +storm.messaging.netty.transfer.batch.size: 262144 > +# Sets the backlog value to specify when the channel binds to a local > address > +storm.messaging.netty.socket.backlog: 500 > + > +# By default, the Netty SASL authentication is set to false. Users > can override and set it true for a specific topology. > +storm.messaging.netty.authentication: false > + > +# Default plugin to use for automatic network topology discovery > +storm.network.topography.plugin: org.apache.storm.networktopography. > DefaultRackDNSToSwitchMapping > + > +# default number of seconds group mapping service will cache user > group > +storm.group.mapping.service.cache.duration.secs: 120 > + > +### topology.* configs are for specific executing storms > +topology.enable.message.timeouts: true > +topology.debug: false > +topology.workers: 1 > +topology.acker.executors: null > +topology.eventlogger.executors: 0 > +topology.tasks: null > +# maximum amount of time a message has to complete before it's > considered failed > +topology.message.timeout.secs: 30 > +topology.multilang.serializer: "org.apache.storm.multilang. > JsonSerializer" > +topology.shellbolt.max.pending: 100 > +topology.skip.missing.kryo.registrations: false > +topology.max.task.parallelism: null > +topology.max.spout.pending: null > +topology.state.synchronization.timeout.secs: 60 > +topology.stats.sample.rate: 0.05 > +topology.builtin.metrics.bucket.size.secs: 60 > +topology.fall.back.on.java.serialization: true > +topology.worker.childopts: null > +topology.worker.logwriter.childopts: "-Xmx64m" > +topology.executor.receive.buffer.size: 1024 #batched > +topology.executor.send.buffer.size: 1024 #individual messages > +topology.transfer.buffer.size: 1024 # batched > +topology.tick.tuple.freq.secs: null > +topology.worker.shared.thread.pool.size: 4 > +topology.spout.wait.strategy: "org.apache.storm.spout. > SleepSpoutWaitStrategy" > +topology.sleep.spout.wait.strategy.time.ms: 1 > +topology.error.throttle.interval.secs: 10 > +topology.max.error.report.per.interval: 5 > +topology.kryo.factory: "org.apache.storm.serialization. > DefaultKryoFactory" > +topology.tuple.serializer: "org.apache.storm.serialization.types. > ListDelegateSerializer" > +topology.trident.batch.emit.interval.millis: 500 > +topology.testing.always.try.serialize: false > +topology.classpath: null > +topology.environment: null > +topology.bolts.outgoing.overflow.buffer.enable: false > +topology.disruptor.wait.timeout.millis: 1000 > +topology.disruptor.batch.size: 100 > +topology.disruptor.batch.timeout.millis: 1 > +topology.disable.loadaware: false > +topology.state.checkpoint.interval.ms: 1000 > + > +# Configs for Resource Aware Scheduler > +# topology priority describing the importance of the topology in > decreasing importance starting from 0 (i.e. 0 is the highest priority and > the priority importance decreases as the priority number increases). > +# Recommended range of 0-29 but no hard limit set. > +topology.priority: 29 > +topology.component.resources.onheap.memory.mb: 128.0 > +topology.component.resources.offheap.memory.mb: 0.0 > +topology.component.cpu.pcore.percent: 10.0 > +topology.worker.max.heap.size.mb: 768.0 > +topology.scheduler.strategy: "org.apache.storm.scheduler. > resource.strategies.scheduling.DefaultResourceAwareStrategy" > +resource.aware.scheduler.eviction.strategy: > "org.apache.storm.scheduler.resource.strategies.eviction. > DefaultEvictionStrategy" > +resource.aware.scheduler.priority.strategy: > "org.apache.storm.scheduler.resource.strategies.priority. > DefaultSchedulingPriorityStrategy" > + > +dev.zookeeper.path: "/tmp/dev-storm-zookeeper" > + > +pacemaker.host: "localhost" > +pacemaker.port: 6699 > +pacemaker.base.threads: 10 > +pacemaker.max.threads: 50 > +pacemaker.thread.timeout: 10 > +pacemaker.childopts: "-Xmx1024m" > +pacemaker.auth.method: "NONE" > +pacemaker.kerberos.users: [] > + > +#default storm daemon metrics reporter plugins > +storm.daemon.metrics.reporter.plugins: > + - "org.apache.storm.daemon.metrics.reporters. > JmxPreparableReporter" > > http://git-wip-us.apache.org/repos/asf/incubator-ranger/ > blob/7729373d/storm-agent/src/test/resources/words.txt > ---------------------------------------------------------------------- > diff --git a/storm-agent/src/test/resources/words.txt > b/storm-agent/src/test/resources/words.txt > new file mode 100644 > index 0000000..c7725df > --- /dev/null > +++ b/storm-agent/src/test/resources/words.txt > @@ -0,0 +1,27 @@ > +???# Licensed to the Apache Software Foundation (ASF) under one > +# or more contributor license agreements. See the NOTICE file > +# distributed with this work for additional information > +# regarding copyright ownership. The ASF licenses this file > +# to you under the Apache License, Version 2.0 (the > +# "License"); you may not use this file except in compliance > +# with the License. You may obtain a copy of the License at > +# > +# http://www.apache.org/licenses/LICENSE-2.0 > +# > +# Unless required by applicable law or agreed to in writing, software > +# distributed under the License is distributed on an "AS IS" BASIS, > +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > +# See the License for the specific language governing permissions and > +# limitations under the License. > +the > +and > +storm > +random > +word > +spout > +test > +apache > +the > +spoon > +and > +the > \ No newline at end of file > > > > > > > -- Colm O hEigeartaigh Talend Community Coder http://coders.talend.com
