Hi Madhan,

The build errors only occur with JDK 1.7, which is why I didn't spot them.
Upgrading Storm from 1.0.0 -> 1.0.2 fixes the problem, which suggests that
the build errors are due to a bug in Storm. Any objections if I perform
this upgrade?

Colm.

On Mon, Aug 29, 2016 at 6:47 PM, Madhan Neethiraj <
[email protected]> wrote:

> Colm,
>
> Afer this commit, master build fails with the following error. Can you
> please review?
>
> Thanks,
> Madhan
>
> 19276 [Thread-12] ERROR o.a.s.event - Error when processing event
> org.apache.storm.generated.AuthorizationException
>         at 
> org.apache.storm.blobstore.BlobStoreAclHandler.hasPermissions(BlobStoreAclHandler.java:292)
> ~[storm-core-1.0.0.jar:1.0.0]
>         at 
> org.apache.storm.blobstore.LocalFsBlobStore.getBlob(LocalFsBlobStore.java:240)
> ~[storm-core-1.0.0.jar:1.0.0]
>         at org.apache.storm.blobstore.BlobStore.readBlobTo(BlobStore.java:271)
> ~[storm-core-1.0.0.jar:1.0.0]
>         at 
> org.apache.storm.daemon.supervisor$fn__9351$fn__9352.invoke(supervisor.clj:1176)
> ~[storm-core-1.0.0.jar:1.0.0]
>         at 
> org.apache.storm.daemon.supervisor$fn__9351.invoke(supervisor.clj:1172)
> ~[storm-core-1.0.0.jar:1.0.0]
>         at clojure.lang.MultiFn.invoke(MultiFn.java:243)
> ~[clojure-1.7.0.jar:?]
>         at org.apache.storm.daemon.supervisor$mk_synchronize_
> supervisor$this__9070$fn__9088.invoke(supervisor.clj:582)
> ~[storm-core-1.0.0.jar:1.0.0]
>         at org.apache.storm.daemon.supervisor$mk_synchronize_
> supervisor$this__9070.invoke(supervisor.clj:581)
> ~[storm-core-1.0.0.jar:1.0.0]
>         at org.apache.storm.event$event_manager$fn__8622.invoke(event.clj:40)
> [storm-core-1.0.0.jar:1.0.0]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.7.0_71]
> 19277 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl
> - backgroundOperationsLoop exiting
> 19278 [ProcessThread(sid:0 cport:-1):] INFO  
> o.a.s.s.o.a.z.s.PrepRequestProcessor
> - Processed session termination for sessionid: 0x156d760f67c0010
> 19279 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x156d760f67c0010
> closed
> 19279 [main-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut
> down
> 19279 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO
> o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client
> /fe80:0:0:0:0:0:0:1%1:56076 which had sessionid 0x156d760f67c0010
> 19279 [main] INFO  o.a.s.cluster - setup-path/blobstore/temp2-2-
> 1472492282-stormcode.ser/10.22.8.99:6627-1
> 19283 [Thread-12] ERROR o.a.s.util - Halting process: ("Error when
> processing an event")
> java.lang.RuntimeException: ("Error when processing an event")
>         at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
> [storm-core-1.0.0.jar:1.0.0]
>         at clojure.lang.RestFn.invoke(RestFn.java:423)
> [clojure-1.7.0.jar:?]
>         at org.apache.storm.event$event_manager$fn__8622.invoke(event.clj:48)
> [storm-core-1.0.0.jar:1.0.0]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.7.0_71]
>
> Results :
>
> Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
>
>
>
>
>
> On 8/29/16, 7:25 AM, "[email protected]" <[email protected]> wrote:
>
>     Repository: incubator-ranger
>     Updated Branches:
>       refs/heads/master b15d6fa8a -> 7729373de
>
>
>     RANGER-1133 - Add tests for the Storm plugin
>
>     Signed-off-by: Selvamohan Neethiraj <[email protected]>
>
>
>     Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
>     Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> commit/7729373d
>     Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> tree/7729373d
>     Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> diff/7729373d
>
>     Branch: refs/heads/master
>     Commit: 7729373dea5a560f5cb4b24433d12b25a9dc7519
>     Parents: b15d6fa
>     Author: Colm O hEigeartaigh <[email protected]>
>     Authored: Tue Aug 2 12:25:55 2016 +0100
>     Committer: Selvamohan Neethiraj <[email protected]>
>     Committed: Mon Aug 29 10:13:59 2016 -0400
>
>     ----------------------------------------------------------------------
>      .../ranger/plugin/util/RangerResourceTrie.java  |   2 +-
>      .../security/listener/SpringEventListener.java  |   3 +-
>      storm-agent/pom.xml                             |  27 +-
>      .../storm/RangerAdminClientImpl.java            |  84 +++++
>      .../storm/StormRangerAuthorizerTest.java        | 190 +++++++++++
>      .../authorization/storm/WordCounterBolt.java    |  66 ++++
>      .../ranger/authorization/storm/WordSpout.java   |  68 ++++
>      .../test/resources/ranger-storm-security.xml    |  45 +++
>      .../src/test/resources/storm-policies.json      | 337
> +++++++++++++++++++
>      storm-agent/src/test/resources/storm.yaml       | 289
> ++++++++++++++++
>      storm-agent/src/test/resources/words.txt        |  27 ++
>      11 files changed, 1132 insertions(+), 6 deletions(-)
>     ----------------------------------------------------------------------
>
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/agents-common/src/main/java/org/apache/ranger/plugin/util/
> RangerResourceTrie.java
>     ----------------------------------------------------------------------
>     diff --git a/agents-common/src/main/java/
> org/apache/ranger/plugin/util/RangerResourceTrie.java
> b/agents-common/src/main/java/org/apache/ranger/plugin/util/
> RangerResourceTrie.java
>     index 809c07e..a5ffd1a 100644
>     --- a/agents-common/src/main/java/org/apache/ranger/plugin/util/
> RangerResourceTrie.java
>     +++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/
> RangerResourceTrie.java
>     @@ -59,7 +59,7 @@ public class RangerResourceTrie {
>
>              this.resourceName  = resourceDef.getName();
>              this.optIgnoreCase = strIgnoreCase != null ?
> Boolean.parseBoolean(strIgnoreCase) : false;
>     -        this.optWildcard   = strWildcard != null ?
> Boolean.parseBoolean(strWildcard) : false;;
>     +        this.optWildcard   = strWildcard != null ?
> Boolean.parseBoolean(strWildcard) : false;
>              this.wildcardChars = optWildcard ? DEFAULT_WILDCARD_CHARS :
> "";
>              this.root          = new TrieNode(Character.valueOf((
> char)0));
>
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/security-admin/src/main/java/org/apache/
> ranger/security/listener/SpringEventListener.java
>     ----------------------------------------------------------------------
>     diff --git a/security-admin/src/main/java/org/apache/ranger/
> security/listener/SpringEventListener.java b/security-admin/src/main/
> java/org/apache/ranger/security/listener/SpringEventListener.java
>     index 5876445..29a35cf 100644
>     --- a/security-admin/src/main/java/org/apache/ranger/
> security/listener/SpringEventListener.java
>     +++ b/security-admin/src/main/java/org/apache/ranger/
> security/listener/SpringEventListener.java
>     @@ -50,9 +50,8 @@ public class SpringEventListener implements
>                 process((AuthenticationFailureBadCredentialsEvent) event);
>             } else if (event instanceof AuthenticationFailureDisabledEvent)
> {
>                 process((AuthenticationFailureDisabledEvent) event);
>     -       } else {
>     -           // igonre all other events
>             }
>     +       // igonre all other events
>
>         } catch (Exception e) {
>             logger.error("Exception in Spring Event Listener.", e);
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/pom.xml
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/pom.xml b/storm-agent/pom.xml
>     index b975a92..d49c2fe 100644
>     --- a/storm-agent/pom.xml
>     +++ b/storm-agent/pom.xml
>     @@ -35,6 +35,12 @@
>                  <groupId>org.apache.storm</groupId>
>                  <artifactId>storm-core</artifactId>
>                  <version>${storm.version}</version>
>     +            <exclusions>
>     +                <exclusion>
>     +                    <groupId>org.slf4j</groupId>
>     +                    <artifactId>log4j-over-slf4j</artifactId>
>     +                </exclusion>
>     +            </exclusions>
>              </dependency>
>              <dependency>
>                  <groupId>org.apache.ranger</groupId>
>     @@ -93,11 +99,26 @@
>                         <groupId>org.apache.httpcomponents</groupId>
>                         <artifactId>httpcore</artifactId>
>                         <version>${httpcomponents.
> httpcore.version}</version>
>     -           </dependency>
>     -           <dependency>
>     +   </dependency>
>     +   <dependency>
>                  <groupId>commons-codec</groupId>
>                  <artifactId>commons-codec</artifactId>
>                  <version>${commons.codec.version}</version>
>     -           </dependency>
>     +   </dependency>
>     +        <dependency>
>     +            <groupId>junit</groupId>
>     +            <artifactId>junit</artifactId>
>     +        </dependency>
>          </dependencies>
>     +    <build>
>     +        <testResources>
>     +           <testResource>
>     +                <directory>src/test/resources</directory>
>     +                <includes>
>     +                    <include>**/*</include>
>     +                </includes>
>     +                <filtering>true</filtering>
>     +            </testResource>
>     +        </testResources>
>     +    </build>
>      </project>
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/RangerAdminClientImpl.java
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/RangerAdminClientImpl.java
> b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/
> RangerAdminClientImpl.java
>     new file mode 100644
>     index 0000000..e6c289a
>     --- /dev/null
>     +++ b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/
> RangerAdminClientImpl.java
>     @@ -0,0 +1,84 @@
>     +/*
>     + * Licensed to the Apache Software Foundation (ASF) under one or more
>     + * contributor license agreements.  See the NOTICE file distributed
> with
>     + * this work for additional information regarding copyright ownership.
>     + * The ASF licenses this file to You under the Apache License,
> Version 2.0
>     + * (the "License"); you may not use this file except in compliance
> with
>     + * the License.  You may obtain a copy of the License at
>     + *
>     + *      http://www.apache.org/licenses/LICENSE-2.0
>     + *
>     + * Unless required by applicable law or agreed to in writing, software
>     + * distributed under the License is distributed on an "AS IS" BASIS,
>     + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>     + * See the License for the specific language governing permissions and
>     + * limitations under the License.
>     + */
>     +
>     +package org.apache.ranger.authorization.storm;
>     +
>     +import java.io.File;
>     +import java.nio.file.FileSystems;
>     +import java.nio.file.Files;
>     +import java.util.List;
>     +
>     +import org.apache.ranger.admin.client.RangerAdminClient;
>     +import org.apache.ranger.plugin.util.GrantRevokeRequest;
>     +import org.apache.ranger.plugin.util.ServicePolicies;
>     +import org.apache.ranger.plugin.util.ServiceTags;
>     +import org.slf4j.Logger;
>     +import org.slf4j.LoggerFactory;
>     +
>     +import com.google.gson.Gson;
>     +import com.google.gson.GsonBuilder;
>     +
>     +/**
>     + * A test implementation of the RangerAdminClient interface that just
> reads policies in from a file and returns them
>     + */
>     +public class RangerAdminClientImpl implements RangerAdminClient {
>     +    private static final Logger LOG = LoggerFactory.getLogger(
> RangerAdminClientImpl.class);
>     +    private final static String cacheFilename = "storm-policies.json";
>     +    private Gson gson;
>     +
>     +    public void init(String serviceName, String appId, String
> configPropertyPrefix) {
>     +        Gson gson = null;
>     +        try {
>     +            gson = new GsonBuilder().setDateFormat("
> yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
>     +        } catch(Throwable excp) {
>     +            LOG.error("RangerAdminClientImpl: failed to create
> GsonBuilder object", excp);
>     +        }
>     +        this.gson = gson;
>     +    }
>     +
>     +    public ServicePolicies getServicePoliciesIfUpdated(long
> lastKnownVersion) throws Exception {
>     +
>     +        String basedir = System.getProperty("basedir");
>     +        if (basedir == null) {
>     +            basedir = new File(".").getCanonicalPath();
>     +        }
>     +
>     +        java.nio.file.Path cachePath = 
> FileSystems.getDefault().getPath(basedir,
> "/src/test/resources/" + cacheFilename);
>     +        byte[] cacheBytes = Files.readAllBytes(cachePath);
>     +
>     +        return gson.fromJson(new String(cacheBytes),
> ServicePolicies.class);
>     +    }
>     +
>     +    public void grantAccess(GrantRevokeRequest request) throws
> Exception {
>     +
>     +    }
>     +
>     +    public void revokeAccess(GrantRevokeRequest request) throws
> Exception {
>     +
>     +    }
>     +
>     +    public ServiceTags getServiceTagsIfUpdated(long lastKnownVersion)
> throws Exception {
>     +        return null;
>     +
>     +    }
>     +
>     +    public List<String> getTagTypes(String tagTypePattern) throws
> Exception {
>     +        return null;
>     +    }
>     +
>     +
>     +}
>     \ No newline at end of file
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/StormRangerAuthorizerTest.java
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/StormRangerAuthorizerTest.java
> b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/
> StormRangerAuthorizerTest.java
>     new file mode 100644
>     index 0000000..0c249c5
>     --- /dev/null
>     +++ b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/
> StormRangerAuthorizerTest.java
>     @@ -0,0 +1,190 @@
>     +/*
>     + * Licensed to the Apache Software Foundation (ASF) under one or more
>     + * contributor license agreements.  See the NOTICE file distributed
> with
>     + * this work for additional information regarding copyright ownership.
>     + * The ASF licenses this file to You under the Apache License,
> Version 2.0
>     + * (the "License"); you may not use this file except in compliance
> with
>     + * the License.  You may obtain a copy of the License at
>     + *
>     + *      http://www.apache.org/licenses/LICENSE-2.0
>     + *
>     + * Unless required by applicable law or agreed to in writing, software
>     + * distributed under the License is distributed on an "AS IS" BASIS,
>     + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>     + * See the License for the specific language governing permissions and
>     + * limitations under the License.
>     + */
>     +
>     +package org.apache.ranger.authorization.storm;
>     +
>     +import java.security.Principal;
>     +import java.security.PrivilegedExceptionAction;
>     +
>     +import javax.security.auth.Subject;
>     +
>     +import org.apache.storm.Config;
>     +import org.apache.storm.LocalCluster;
>     +import org.apache.storm.generated.RebalanceOptions;
>     +import org.apache.storm.topology.TopologyBuilder;
>     +import org.junit.Assert;
>     +
>     +/**
>     + * A simple test that wires a WordSpout + WordCounterBolt into a
> topology and runs it. The "RangerStormAuthorizer" takes care of
> authorization.
>     + * The policies state that "bob" can do anything with the
> "word-count" topology. In addition, "bob" can create/kill the "temp*"
> topologies, but do
>     + * nothing else.
>     + */
>     +public class StormRangerAuthorizerTest {
>     +
>     +    private static LocalCluster cluster;
>     +
>     +    @org.junit.BeforeClass
>     +    public static void setup() throws Exception {
>     +        cluster = new LocalCluster();
>     +
>     +        final Config conf = new Config();
>     +        conf.setDebug(true);
>     +
>     +        final TopologyBuilder builder = new TopologyBuilder();
>     +        builder.setSpout("words", new WordSpout());
>     +        builder.setBolt("counter", new WordCounterBolt()).
> shuffleGrouping("words");
>     +
>     +        // bob can create a new topology
>     +        final Subject subject = new Subject();
>     +        subject.getPrincipals().add(new SimplePrincipal("bob"));
>     +        Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
>     +            public Void run() throws Exception {
>     +                cluster.submitTopology("word-count", conf,
> builder.createTopology());
>     +                return null;
>     +            }
>     +        });
>     +
>     +    }
>     +
>     +    @org.junit.AfterClass
>     +    public static void cleanup() throws Exception {
>     +        final Subject subject = new Subject();
>     +        subject.getPrincipals().add(new SimplePrincipal("bob"));
>     +        Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
>     +            public Void run() throws Exception {
>     +                cluster.killTopology("word-count");
>     +                return null;
>     +            }
>     +        });
>     +
>     +        cluster.shutdown();
>     +        System.clearProperty("storm.conf.file");
>     +    }
>     +
>     +    // "bob" can't create topologies other than "word-count" and
> "temp*"
>     +    @org.junit.Test
>     +    public void testCreateTopologyBob() throws Exception {
>     +        final Config conf = new Config();
>     +        conf.setDebug(true);
>     +
>     +        final TopologyBuilder builder = new TopologyBuilder();
>     +        builder.setSpout("words", new WordSpout());
>     +        builder.setBolt("counter", new WordCounterBolt()).
> shuffleGrouping("words");
>     +
>     +        final Subject subject = new Subject();
>     +        subject.getPrincipals().add(new SimplePrincipal("bob"));
>     +        Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
>     +            public Void run() throws Exception {
>     +                try {
>     +                    cluster.submitTopology("word-count2", conf,
> builder.createTopology());
>     +                    Assert.fail("Authorization failure expected");
>     +                } catch (Throwable ex) {
>     +                    // expected
>     +                }
>     +
>     +                return null;
>     +            }
>     +        });
>     +    }
>     +
>     +    @org.junit.Test
>     +    public void testTopologyActivation() throws Exception {
>     +        final Subject subject = new Subject();
>     +        subject.getPrincipals().add(new SimplePrincipal("bob"));
>     +        Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
>     +            public Void run() throws Exception {
>     +
>     +                // Deactivate "word-count"
>     +                cluster.deactivate("word-count");
>     +
>     +                // Create a new topology called "temp1"
>     +                final Config conf = new Config();
>     +                conf.setDebug(true);
>     +
>     +                final TopologyBuilder builder = new TopologyBuilder();
>     +                builder.setSpout("words", new WordSpout());
>     +                builder.setBolt("counter", new WordCounterBolt()).
> shuffleGrouping("words");
>     +                cluster.submitTopology("temp1", conf,
> builder.createTopology());
>     +
>     +                // Try to deactivate "temp1"
>     +                try {
>     +                    cluster.deactivate("temp1");
>     +                    Assert.fail("Authorization failure expected");
>     +                } catch (Throwable ex) {
>     +                    // expected
>     +                }
>     +
>     +                // Re-activate "word-count"
>     +                cluster.activate("word-count");
>     +
>     +                // Kill temp1
>     +                cluster.killTopology("temp1");
>     +
>     +                return null;
>     +            }
>     +        });
>     +    }
>     +
>     +    @org.junit.Test
>     +    public void testTopologyRebalancing() throws Exception {
>     +        final Subject subject = new Subject();
>     +        subject.getPrincipals().add(new SimplePrincipal("bob"));
>     +        Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
>     +            public Void run() throws Exception {
>     +                RebalanceOptions options = new RebalanceOptions();
>     +
>     +                // Create a new topology called "temp2"
>     +                final Config conf = new Config();
>     +                conf.setDebug(true);
>     +
>     +                final TopologyBuilder builder = new TopologyBuilder();
>     +                builder.setSpout("words", new WordSpout());
>     +                builder.setBolt("counter", new WordCounterBolt()).
> shuffleGrouping("words");
>     +                cluster.submitTopology("temp2", conf,
> builder.createTopology());
>     +
>     +                // Try to rebalance "temp2"
>     +                try {
>     +                    cluster.rebalance("temp2", options);
>     +                    Assert.fail("Authorization failure expected");
>     +                } catch (Throwable ex) {
>     +                    // expected
>     +                }
>     +
>     +                // Kill temp2
>     +                cluster.killTopology("temp2");
>     +
>     +                return null;
>     +            }
>     +        });
>     +    }
>     +
>     +
>     +    private static class SimplePrincipal implements Principal {
>     +
>     +        private final String name;
>     +
>     +        public SimplePrincipal(String name) {
>     +            this.name = name;
>     +        }
>     +
>     +        @Override
>     +        public String getName() {
>     +            return name;
>     +        }
>     +
>     +    }
>     +}
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/WordCounterBolt.java
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/WordCounterBolt.java b/storm-agent/src/test/java/
> org/apache/ranger/authorization/storm/WordCounterBolt.java
>     new file mode 100644
>     index 0000000..0e327c7
>     --- /dev/null
>     +++ b/storm-agent/src/test/java/org/apache/ranger/authorization/storm/
> WordCounterBolt.java
>     @@ -0,0 +1,66 @@
>     +/*
>     + * Licensed to the Apache Software Foundation (ASF) under one or more
>     + * contributor license agreements.  See the NOTICE file distributed
> with
>     + * this work for additional information regarding copyright ownership.
>     + * The ASF licenses this file to You under the Apache License,
> Version 2.0
>     + * (the "License"); you may not use this file except in compliance
> with
>     + * the License.  You may obtain a copy of the License at
>     + *
>     + *      http://www.apache.org/licenses/LICENSE-2.0
>     + *
>     + * Unless required by applicable law or agreed to in writing, software
>     + * distributed under the License is distributed on an "AS IS" BASIS,
>     + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>     + * See the License for the specific language governing permissions and
>     + * limitations under the License.
>     + */
>     +
>     +package org.apache.ranger.authorization.storm;
>     +
>     +import java.util.HashMap;
>     +import java.util.Map;
>     +
>     +import org.apache.storm.task.OutputCollector;
>     +import org.apache.storm.task.TopologyContext;
>     +import org.apache.storm.topology.OutputFieldsDeclarer;
>     +import org.apache.storm.topology.base.BaseRichBolt;
>     +import org.apache.storm.tuple.Fields;
>     +import org.apache.storm.tuple.Tuple;
>     +import org.apache.storm.tuple.Values;
>     +
>     +/**
>     + * A Storm Bolt which reads in a word and counts it + outputs the
> word + current count
>     + */
>     +public class WordCounterBolt extends BaseRichBolt {
>     +    private OutputCollector outputCollector;
>     +    private Map<String, Integer> countMap = new HashMap<>();
>     +
>     +    @Override
>     +    public void execute(Tuple tuple) {
>     +        String word = tuple.getString(0);
>     +
>     +        int count = 0;
>     +        if (countMap.containsKey(word)) {
>     +            count = countMap.get(word);
>     +            count++;
>     +        }
>     +        count++;
>     +        countMap.put(word, count);
>     +
>     +        outputCollector.emit(new Values(word, count));
>     +        outputCollector.ack(tuple);
>     +
>     +    }
>     +
>     +    @Override
>     +    public void prepare(Map arg0, TopologyContext arg1,
> OutputCollector outputCollector) {
>     +        this.outputCollector = outputCollector;
>     +    }
>     +
>     +    @Override
>     +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     +        declarer.declare(new Fields("word", "count"));
>     +    }
>     +
>     +
>     +}
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/WordSpout.java
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/WordSpout.java b/storm-agent/src/test/java/
> org/apache/ranger/authorization/storm/WordSpout.java
>     new file mode 100644
>     index 0000000..5f0b2cf
>     --- /dev/null
>     +++ b/storm-agent/src/test/java/org/apache/ranger/
> authorization/storm/WordSpout.java
>     @@ -0,0 +1,68 @@
>     +/*
>     + * Licensed to the Apache Software Foundation (ASF) under one or more
>     + * contributor license agreements.  See the NOTICE file distributed
> with
>     + * this work for additional information regarding copyright ownership.
>     + * The ASF licenses this file to You under the Apache License,
> Version 2.0
>     + * (the "License"); you may not use this file except in compliance
> with
>     + * the License.  You may obtain a copy of the License at
>     + *
>     + *      http://www.apache.org/licenses/LICENSE-2.0
>     + *
>     + * Unless required by applicable law or agreed to in writing, software
>     + * distributed under the License is distributed on an "AS IS" BASIS,
>     + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>     + * See the License for the specific language governing permissions and
>     + * limitations under the License.
>     + */
>     +
>     +package org.apache.ranger.authorization.storm;
>     +
>     +import java.util.List;
>     +import java.util.Map;
>     +
>     +import org.apache.storm.shade.org.apache.commons.io.IOUtils;
>     +import org.apache.storm.spout.SpoutOutputCollector;
>     +import org.apache.storm.task.TopologyContext;
>     +import org.apache.storm.topology.OutputFieldsDeclarer;
>     +import org.apache.storm.topology.base.BaseRichSpout;
>     +import org.apache.storm.tuple.Fields;
>     +import org.apache.storm.tuple.Values;
>     +
>     +/**
>     + * A Storm Spout which reads in words.txt + emits a word from it
> (sequentially)
>     + */
>     +public class WordSpout extends BaseRichSpout {
>     +    private final List<String> words;
>     +    private SpoutOutputCollector collector;
>     +    private int line = 0;
>     +
>     +    public WordSpout() throws Exception {
>     +        java.io.File inputFile = new java.io.File(WordSpout.class.
> getResource("../../../../../words.txt").toURI());
>     +        words = IOUtils.readLines(new java.io.FileInputStream(
> inputFile));
>     +    }
>     +
>     +    @Override
>     +    public void nextTuple() {
>     +        if (line < words.size()) {
>     +           String lineVal = words.get(line++);
>     +           while (lineVal.startsWith("#") && line < words.size()) {
>     +                   lineVal = words.get(line++);
>     +           }
>     +           if (lineVal != null) {
>     +                   collector.emit(new Values(lineVal.trim()));
>     +           }
>     +        }
>     +    }
>     +
>     +    @Override
>     +    public void open(Map arg0, TopologyContext arg1,
> SpoutOutputCollector collector) {
>     +        this.collector = collector;
>     +    }
>     +
>     +    @Override
>     +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     +        declarer.declare(new Fields("word"));
>     +    }
>     +
>     +
>     +}
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/src/test/resources/ranger-storm-security.xml
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/src/test/resources/ranger-storm-security.xml
> b/storm-agent/src/test/resources/ranger-storm-security.xml
>     new file mode 100644
>     index 0000000..adff2b9
>     --- /dev/null
>     +++ b/storm-agent/src/test/resources/ranger-storm-security.xml
>     @@ -0,0 +1,45 @@
>     +<?xml version="1.0"?>
>     +<!--
>     +  Licensed to the Apache Software Foundation (ASF) under one or more
>     +  contributor license agreements.  See the NOTICE file distributed
> with
>     +  this work for additional information regarding copyright ownership.
>     +  The ASF licenses this file to You under the Apache License, Version
> 2.0
>     +  (the "License"); you may not use this file except in compliance with
>     +  the License.  You may obtain a copy of the License at
>     +
>     +      http://www.apache.org/licenses/LICENSE-2.0
>     +
>     +  Unless required by applicable law or agreed to in writing, software
>     +  distributed under the License is distributed on an "AS IS" BASIS,
>     +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>     +  See the License for the specific language governing permissions and
>     +  limitations under the License.
>     +-->
>     +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
>     +<configuration xmlns:xi="http://www.w3.org/2001/XInclude";>
>     +
>     +   <property>
>     +           <name>ranger.plugin.storm.service.name</name>
>     +           <value>StormTest</value>
>     +           <description>
>     +                   Name of the Ranger service containing policies for
> this SampleApp instance
>     +           </description>
>     +   </property>
>     +
>     +   <property>
>     +        <name>ranger.plugin.storm.policy.source.impl</name>
>     +        <value>org.apache.ranger.authorization.storm.
> RangerAdminClientImpl</value>
>     +        <description>
>     +            Policy source.
>     +        </description>
>     +    </property>
>     +
>     +   <property>
>     +           <name>ranger.plugin.storm.policy.cache.dir</name>
>     +           <value>${project.build.directory}</value>
>     +           <description>
>     +                   Directory where Ranger policies are cached after
> successful retrieval from the source
>     +           </description>
>     +   </property>
>     +
>     +</configuration>
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/src/test/resources/storm-policies.json
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/src/test/resources/storm-policies.json
> b/storm-agent/src/test/resources/storm-policies.json
>     new file mode 100644
>     index 0000000..5c04b5d
>     --- /dev/null
>     +++ b/storm-agent/src/test/resources/storm-policies.json
>     @@ -0,0 +1,337 @@
>     +{
>     +  "serviceName": "StormTest",
>     +  "serviceId": 10,
>     +  "policyVersion": 12,
>     +  "policyUpdateTime": "20160704-15:53:20.000-+0100",
>     +  "policies": [
>     +    {
>     +      "service": "StormTest",
>     +      "name": "WordCount",
>     +      "description": "",
>     +      "resourceSignature": "25dc26943b5859a6e5f904388cd02830",
>     +      "isAuditEnabled": true,
>     +      "resources": {
>     +        "topology": {
>     +          "values": [
>     +            "word-count"
>     +          ],
>     +          "isExcludes": false,
>     +          "isRecursive": false
>     +        }
>     +      },
>     +      "policyItems": [
>     +        {
>     +          "accesses": [
>     +            {
>     +              "type": "submitTopology",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "fileUpload",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "getNimbusConf",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "getClusterInfo",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "fileDownload",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "killTopology",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "rebalance",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "activate",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "deactivate",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "getTopologyConf",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "getTopology",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "getUserTopology",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "getTopologyInfo",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "uploadNewCredentials",
>     +              "isAllowed": true
>     +            }
>     +          ],
>     +          "users": [
>     +            "bob"
>     +          ],
>     +          "groups": [],
>     +          "conditions": [],
>     +          "delegateAdmin": false
>     +        }
>     +      ],
>     +      "denyPolicyItems": [],
>     +      "allowExceptions": [],
>     +      "denyExceptions": [],
>     +      "dataMaskPolicyItems": [],
>     +      "rowFilterPolicyItems": [],
>     +      "id": 40,
>     +      "guid": "1467386247700_275_2404",
>     +      "isEnabled": true,
>     +      "createdBy": "Admin",
>     +      "updatedBy": "Admin",
>     +      "createTime": "20160701-16:17:27.000-+0100",
>     +      "updateTime": "20160704-14:51:01.000-+0100",
>     +      "version": 6
>     +    },
>     +    {
>     +      "service": "StormTest",
>     +      "name": "TempPolicy",
>     +      "description": "",
>     +      "resourceSignature": "1e4cafdc98da3cec11b565ef03cfab14",
>     +      "isAuditEnabled": true,
>     +      "resources": {
>     +        "topology": {
>     +          "values": [
>     +            "temp*"
>     +          ],
>     +          "isExcludes": false,
>     +          "isRecursive": false
>     +        }
>     +      },
>     +      "policyItems": [
>     +        {
>     +          "accesses": [
>     +            {
>     +              "type": "submitTopology",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "getClusterInfo",
>     +              "isAllowed": true
>     +            },
>     +            {
>     +              "type": "killTopology",
>     +              "isAllowed": true
>     +            }
>     +          ],
>     +          "users": [
>     +            "bob"
>     +          ],
>     +          "groups": [],
>     +          "conditions": [],
>     +          "delegateAdmin": false
>     +        }
>     +      ],
>     +      "denyPolicyItems": [],
>     +      "allowExceptions": [],
>     +      "denyExceptions": [],
>     +      "dataMaskPolicyItems": [],
>     +      "rowFilterPolicyItems": [],
>     +      "id": 42,
>     +      "guid": "1467641649473_569_2619",
>     +      "isEnabled": true,
>     +      "createdBy": "Admin",
>     +      "updatedBy": "Admin",
>     +      "createTime": "20160704-15:14:09.000-+0100",
>     +      "updateTime": "20160704-15:53:20.000-+0100",
>     +      "version": 5
>     +    }
>     +  ],
>     +  "serviceDef": {
>     +    "name": "storm",
>     +    "implClass": "org.apache.ranger.services.
> storm.RangerServiceStorm",
>     +    "label": "Storm",
>     +    "description": "Storm",
>     +    "options": {},
>     +    "configs": [
>     +      {
>     +        "itemId": 1,
>     +        "name": "username",
>     +        "type": "string",
>     +        "mandatory": true,
>     +        "validationRegEx": "",
>     +        "validationMessage": "",
>     +        "uiHint": "",
>     +        "label": "Username"
>     +      },
>     +      {
>     +        "itemId": 2,
>     +        "name": "password",
>     +        "type": "password",
>     +        "mandatory": true,
>     +        "validationRegEx": "",
>     +        "validationMessage": "",
>     +        "uiHint": "",
>     +        "label": "Password"
>     +      },
>     +      {
>     +        "itemId": 3,
>     +        "name": "nimbus.url",
>     +        "type": "string",
>     +        "mandatory": true,
>     +        "defaultValue": "",
>     +        "validationRegEx": "",
>     +        "validationMessage": "",
>     +        "uiHint": "",
>     +        "label": "Nimbus URL"
>     +      },
>     +      {
>     +        "itemId": 4,
>     +        "name": "commonNameForCertificate",
>     +        "type": "string",
>     +        "mandatory": false,
>     +        "validationRegEx": "",
>     +        "validationMessage": "",
>     +        "uiHint": "",
>     +        "label": "Common Name for Certificate"
>     +      }
>     +    ],
>     +    "resources": [
>     +      {
>     +        "itemId": 1,
>     +        "name": "topology",
>     +        "type": "string",
>     +        "level": 10,
>     +        "mandatory": true,
>     +        "lookupSupported": true,
>     +        "recursiveSupported": false,
>     +        "excludesSupported": true,
>     +        "matcher": "org.apache.ranger.plugin.resourcematcher.
> RangerDefaultResourceMatcher",
>     +        "matcherOptions": {
>     +          "wildCard": "true",
>     +          "ignoreCase": "false"
>     +        },
>     +        "validationRegEx": "",
>     +        "validationMessage": "",
>     +        "uiHint": "",
>     +        "label": "Storm Topology",
>     +        "description": "Storm Topology"
>     +      }
>     +    ],
>     +    "accessTypes": [
>     +      {
>     +        "itemId": 1,
>     +        "name": "submitTopology",
>     +        "label": "Submit Topology",
>     +        "impliedGrants": [
>     +          "fileUpload",
>     +          "fileDownload"
>     +        ]
>     +      },
>     +      {
>     +        "itemId": 2,
>     +        "name": "fileUpload",
>     +        "label": "File Upload",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 3,
>     +        "name": "getNimbusConf",
>     +        "label": "Get Nimbus Conf",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 4,
>     +        "name": "getClusterInfo",
>     +        "label": "Get Cluster Info",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 5,
>     +        "name": "fileDownload",
>     +        "label": "File Download",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 6,
>     +        "name": "killTopology",
>     +        "label": "Kill Topology",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 7,
>     +        "name": "rebalance",
>     +        "label": "Rebalance",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 8,
>     +        "name": "activate",
>     +        "label": "Activate",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 9,
>     +        "name": "deactivate",
>     +        "label": "Deactivate",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 10,
>     +        "name": "getTopologyConf",
>     +        "label": "Get Topology Conf",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 11,
>     +        "name": "getTopology",
>     +        "label": "Get Topology",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 12,
>     +        "name": "getUserTopology",
>     +        "label": "Get User Topology",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 13,
>     +        "name": "getTopologyInfo",
>     +        "label": "Get Topology Info",
>     +        "impliedGrants": []
>     +      },
>     +      {
>     +        "itemId": 14,
>     +        "name": "uploadNewCredentials",
>     +        "label": "Upload New Credential",
>     +        "impliedGrants": []
>     +      }
>     +    ],
>     +    "policyConditions": [],
>     +    "contextEnrichers": [],
>     +    "enums": [],
>     +    "dataMaskDef": {
>     +      "maskTypes": [],
>     +      "accessTypes": [],
>     +      "resources": []
>     +    },
>     +    "rowFilterDef": {
>     +      "accessTypes": [],
>     +      "resources": []
>     +    },
>     +    "id": 6,
>     +    "guid": "2a60f427-edcf-4e20-834c-a9a267b5b963",
>     +    "isEnabled": true,
>     +    "createTime": "20160314-14:39:35.000-+0000",
>     +    "updateTime": "20160314-14:39:35.000-+0000",
>     +    "version": 1
>     +  }
>     +}
>     \ No newline at end of file
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/src/test/resources/storm.yaml
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/src/test/resources/storm.yaml
> b/storm-agent/src/test/resources/storm.yaml
>     new file mode 100644
>     index 0000000..a306966
>     --- /dev/null
>     +++ b/storm-agent/src/test/resources/storm.yaml
>     @@ -0,0 +1,289 @@
>     +# Licensed to the Apache Software Foundation (ASF) under one
>     +# or more contributor license agreements.  See the NOTICE file
>     +# distributed with this work for additional information
>     +# regarding copyright ownership.  The ASF licenses this file
>     +# to you under the Apache License, Version 2.0 (the
>     +# "License"); you may not use this file except in compliance
>     +# with the License.  You may obtain a copy of the License at
>     +#
>     +# http://www.apache.org/licenses/LICENSE-2.0
>     +#
>     +# Unless required by applicable law or agreed to in writing, software
>     +# distributed under the License is distributed on an "AS IS" BASIS,
>     +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>     +# See the License for the specific language governing permissions and
>     +# limitations under the License.
>     +
>     +
>     +########### These all have default values as shown
>     +########### Additional configuration goes into storm.yaml
>     +
>     +java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"
>     +
>     +### storm.* configs are general configurations
>     +# the local dir is where jars are kept
>     +storm.local.dir: "storm-local"
>     +storm.log4j2.conf.dir: "log4j2"
>     +storm.zookeeper.servers:
>     +    - "localhost"
>     +storm.zookeeper.port: 2181
>     +storm.zookeeper.root: "/storm"
>     +storm.zookeeper.session.timeout: 20000
>     +storm.zookeeper.connection.timeout: 15000
>     +storm.zookeeper.retry.times: 5
>     +storm.zookeeper.retry.interval: 1000
>     +storm.zookeeper.retry.intervalceiling.millis: 30000
>     +storm.zookeeper.auth.user: null
>     +storm.zookeeper.auth.password: null
>     +storm.exhibitor.port: 8080
>     +storm.exhibitor.poll.uripath: "/exhibitor/v1/cluster/list"
>     +storm.cluster.mode: "distributed" # can be distributed or local
>     +storm.local.mode.zmq: false
>     +storm.thrift.transport: "org.apache.storm.security.
> auth.SimpleTransportPlugin"
>     +storm.principal.tolocal: "org.apache.storm.security.
> auth.DefaultPrincipalToLocal"
>     +storm.group.mapping.service: "org.apache.storm.security.
> auth.ShellBasedGroupsMapping"
>     +storm.group.mapping.service.params: null
>     +storm.messaging.transport: "org.apache.storm.messaging.netty.Context"
>     +storm.nimbus.retry.times: 5
>     +storm.nimbus.retry.interval.millis: 2000
>     +storm.nimbus.retry.intervalceiling.millis: 60000
>     +storm.auth.simple-white-list.users: []
>     +storm.auth.simple-acl.users: []
>     +storm.auth.simple-acl.users.commands: []
>     +storm.auth.simple-acl.admins: []
>     +storm.cluster.state.store: "org.apache.storm.cluster_
> state.zookeeper_state_factory"
>     +storm.meta.serialization.delegate: "org.apache.storm.serialization.
> GzipThriftSerializationDelegate"
>     +storm.codedistributor.class: "org.apache.storm.codedistributor.
> LocalFileSystemCodeDistributor"
>     +storm.workers.artifacts.dir: "workers-artifacts"
>     +storm.health.check.dir: "healthchecks"
>     +storm.health.check.timeout.ms: 5000
>     +
>     +### nimbus.* configs are for the master
>     +nimbus.seeds : ["localhost"]
>     +nimbus.thrift.port: 6627
>     +nimbus.thrift.threads: 64
>     +nimbus.thrift.max_buffer_size: 1048576
>     +nimbus.childopts: "-Xmx1024m"
>     +nimbus.task.timeout.secs: 30
>     +nimbus.supervisor.timeout.secs: 60
>     +nimbus.monitor.freq.secs: 10
>     +nimbus.cleanup.inbox.freq.secs: 600
>     +nimbus.inbox.jar.expiration.secs: 3600
>     +nimbus.code.sync.freq.secs: 120
>     +nimbus.task.launch.secs: 120
>     +nimbus.file.copy.expiration.secs: 600
>     +nimbus.topology.validator: "org.apache.storm.nimbus.
> DefaultTopologyValidator"
>     +topology.min.replication.count: 1
>     +topology.max.replication.wait.time.sec: 60
>     +nimbus.credential.renewers.freq.secs: 600
>     +nimbus.impersonation.authorizer: "org.apache.storm.security.
> auth.authorizer.ImpersonationAuthorizer"
>     +# Plug in ranger nimbus.authorizer here
>     +nimbus.authorizer: "org.apache.ranger.authorization.storm.authorizer.
> RangerStormAuthorizer"
>     +nimbus.queue.size: 100000
>     +scheduler.display.resource: false
>     +
>     +### ui.* configs are for the master
>     +ui.host: 0.0.0.0
>     +ui.port: 8080
>     +ui.childopts: "-Xmx768m"
>     +ui.actions.enabled: true
>     +ui.filter: null
>     +ui.filter.params: null
>     +ui.users: null
>     +ui.header.buffer.bytes: 4096
>     +ui.http.creds.plugin: org.apache.storm.security.auth.
> DefaultHttpCredentialsPlugin
>     +
>     +logviewer.port: 8000
>     +logviewer.childopts: "-Xmx128m"
>     +logviewer.cleanup.age.mins: 10080
>     +logviewer.appender.name: "A1"
>     +logviewer.max.sum.worker.logs.size.mb: 4096
>     +logviewer.max.per.worker.logs.size.mb: 2048
>     +
>     +logs.users: null
>     +
>     +drpc.port: 3772
>     +drpc.worker.threads: 64
>     +drpc.max_buffer_size: 1048576
>     +drpc.queue.size: 128
>     +drpc.invocations.port: 3773
>     +drpc.invocations.threads: 64
>     +drpc.request.timeout.secs: 600
>     +drpc.childopts: "-Xmx768m"
>     +drpc.http.port: 3774
>     +drpc.https.port: -1
>     +drpc.https.keystore.password: ""
>     +drpc.https.keystore.type: "JKS"
>     +drpc.http.creds.plugin: org.apache.storm.security.auth.
> DefaultHttpCredentialsPlugin
>     +drpc.authorizer.acl.filename: "drpc-auth-acl.yaml"
>     +drpc.authorizer.acl.strict: false
>     +
>     +transactional.zookeeper.root: "/transactional"
>     +transactional.zookeeper.servers: null
>     +transactional.zookeeper.port: null
>     +
>     +## blobstore configs
>     +supervisor.blobstore.class: "org.apache.storm.blobstore.
> NimbusBlobStore"
>     +supervisor.blobstore.download.thread.count: 5
>     +supervisor.blobstore.download.max_retries: 3
>     +supervisor.localizer.cache.target.size.mb: 10240
>     +supervisor.localizer.cleanup.interval.ms: 600000
>     +
>     +nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
>     +nimbus.blobstore.expiration.secs: 600
>     +
>     +storm.blobstore.inputstream.buffer.size.bytes: 65536
>     +client.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
>     +storm.blobstore.replication.factor: 3
>     +
>     +### supervisor.* configs are for node supervisors
>     +# Define the amount of workers that can be run on this machine. Each
> worker is assigned a port to use for communication
>     +supervisor.slots.ports:
>     +    - 6700
>     +    - 6701
>     +    - 6702
>     +    - 6703
>     +supervisor.childopts: "-Xmx256m"
>     +supervisor.run.worker.as.user: false
>     +#how long supervisor will wait to ensure that a worker process is
> started
>     +supervisor.worker.start.timeout.secs: 120
>     +#how long between heartbeats until supervisor considers that worker
> dead and tries to restart it
>     +supervisor.worker.timeout.secs: 30
>     +#how many seconds to sleep for before shutting down threads on worker
>     +supervisor.worker.shutdown.sleep.secs: 1
>     +#how frequently the supervisor checks on the status of the processes
> it's monitoring and restarts if necessary
>     +supervisor.monitor.frequency.secs: 3
>     +#how frequently the supervisor heartbeats to the cluster state (for
> nimbus)
>     +supervisor.heartbeat.frequency.secs: 5
>     +supervisor.enable: true
>     +supervisor.supervisors: []
>     +supervisor.supervisors.commands: []
>     +supervisor.memory.capacity.mb: 3072.0
>     +#By convention 1 cpu core should be about 100, but this can be
> adjusted if needed
>     +# using 100 makes it simple to set the desired value to the capacity
> measurement
>     +# for single threaded bolts
>     +supervisor.cpu.capacity: 400.0
>     +
>     +### worker.* configs are for task workers
>     +worker.heap.memory.mb: 768
>     +worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails
> -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M
> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump"
>     +worker.gc.childopts: ""
>     +
>     +# Unlocking commercial features requires a special license from
> Oracle.
>     +# See http://www.oracle.com/technetwork/java/javase/terms/
> products/index.html
>     +# For this reason, profiler features are disabled by default.
>     +worker.profiler.enabled: false
>     +worker.profiler.childopts: "-XX:+UnlockCommercialFeatures
> -XX:+FlightRecorder"
>     +worker.profiler.command: "flight.bash"
>     +worker.heartbeat.frequency.secs: 1
>     +
>     +# check whether dynamic log levels can be reset from DEBUG to INFO in
> workers
>     +worker.log.level.reset.poll.secs: 30
>     +
>     +# control how many worker receiver threads we need per worker
>     +topology.worker.receiver.thread.count: 1
>     +
>     +task.heartbeat.frequency.secs: 3
>     +task.refresh.poll.secs: 10
>     +task.credentials.poll.secs: 30
>     +
>     +# now should be null by default
>     +topology.backpressure.enable: true
>     +backpressure.disruptor.high.watermark: 0.9
>     +backpressure.disruptor.low.watermark: 0.4
>     +
>     +zmq.threads: 1
>     +zmq.linger.millis: 5000
>     +zmq.hwm: 0
>     +
>     +
>     +storm.messaging.netty.server_worker_threads: 1
>     +storm.messaging.netty.client_worker_threads: 1
>     +storm.messaging.netty.buffer_size: 5242880 #5MB buffer
>     +# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs
> are 120, other workers should also wait at least that long before giving up
> on connecting to the other worker. The reconnection period need also be
> bigger than storm.zookeeper.session.timeout(default is 20s), so that we
> can abort the reconnection when the target worker is dead.
>     +storm.messaging.netty.max_retries: 300
>     +storm.messaging.netty.max_wait_ms: 1000
>     +storm.messaging.netty.min_wait_ms: 100
>     +
>     +# If the Netty messaging layer is busy(netty internal buffer not
> writable), the Netty client will try to batch message as more as possible
> up to the size of storm.messaging.netty.transfer.batch.size bytes,
> otherwise it will try to flush message as soon as possible to reduce
> latency.
>     +storm.messaging.netty.transfer.batch.size: 262144
>     +# Sets the backlog value to specify when the channel binds to a local
> address
>     +storm.messaging.netty.socket.backlog: 500
>     +
>     +# By default, the Netty SASL authentication is set to false.  Users
> can override and set it true for a specific topology.
>     +storm.messaging.netty.authentication: false
>     +
>     +# Default plugin to use for automatic network topology discovery
>     +storm.network.topography.plugin: org.apache.storm.networktopography.
> DefaultRackDNSToSwitchMapping
>     +
>     +# default number of seconds group mapping service will cache user
> group
>     +storm.group.mapping.service.cache.duration.secs: 120
>     +
>     +### topology.* configs are for specific executing storms
>     +topology.enable.message.timeouts: true
>     +topology.debug: false
>     +topology.workers: 1
>     +topology.acker.executors: null
>     +topology.eventlogger.executors: 0
>     +topology.tasks: null
>     +# maximum amount of time a message has to complete before it's
> considered failed
>     +topology.message.timeout.secs: 30
>     +topology.multilang.serializer: "org.apache.storm.multilang.
> JsonSerializer"
>     +topology.shellbolt.max.pending: 100
>     +topology.skip.missing.kryo.registrations: false
>     +topology.max.task.parallelism: null
>     +topology.max.spout.pending: null
>     +topology.state.synchronization.timeout.secs: 60
>     +topology.stats.sample.rate: 0.05
>     +topology.builtin.metrics.bucket.size.secs: 60
>     +topology.fall.back.on.java.serialization: true
>     +topology.worker.childopts: null
>     +topology.worker.logwriter.childopts: "-Xmx64m"
>     +topology.executor.receive.buffer.size: 1024 #batched
>     +topology.executor.send.buffer.size: 1024 #individual messages
>     +topology.transfer.buffer.size: 1024 # batched
>     +topology.tick.tuple.freq.secs: null
>     +topology.worker.shared.thread.pool.size: 4
>     +topology.spout.wait.strategy: "org.apache.storm.spout.
> SleepSpoutWaitStrategy"
>     +topology.sleep.spout.wait.strategy.time.ms: 1
>     +topology.error.throttle.interval.secs: 10
>     +topology.max.error.report.per.interval: 5
>     +topology.kryo.factory: "org.apache.storm.serialization.
> DefaultKryoFactory"
>     +topology.tuple.serializer: "org.apache.storm.serialization.types.
> ListDelegateSerializer"
>     +topology.trident.batch.emit.interval.millis: 500
>     +topology.testing.always.try.serialize: false
>     +topology.classpath: null
>     +topology.environment: null
>     +topology.bolts.outgoing.overflow.buffer.enable: false
>     +topology.disruptor.wait.timeout.millis: 1000
>     +topology.disruptor.batch.size: 100
>     +topology.disruptor.batch.timeout.millis: 1
>     +topology.disable.loadaware: false
>     +topology.state.checkpoint.interval.ms: 1000
>     +
>     +# Configs for Resource Aware Scheduler
>     +# topology priority describing the importance of the topology in
> decreasing importance starting from 0 (i.e. 0 is the highest priority and
> the priority importance decreases as the priority number increases).
>     +# Recommended range of 0-29 but no hard limit set.
>     +topology.priority: 29
>     +topology.component.resources.onheap.memory.mb: 128.0
>     +topology.component.resources.offheap.memory.mb: 0.0
>     +topology.component.cpu.pcore.percent: 10.0
>     +topology.worker.max.heap.size.mb: 768.0
>     +topology.scheduler.strategy: "org.apache.storm.scheduler.
> resource.strategies.scheduling.DefaultResourceAwareStrategy"
>     +resource.aware.scheduler.eviction.strategy:
> "org.apache.storm.scheduler.resource.strategies.eviction.
> DefaultEvictionStrategy"
>     +resource.aware.scheduler.priority.strategy:
> "org.apache.storm.scheduler.resource.strategies.priority.
> DefaultSchedulingPriorityStrategy"
>     +
>     +dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
>     +
>     +pacemaker.host: "localhost"
>     +pacemaker.port: 6699
>     +pacemaker.base.threads: 10
>     +pacemaker.max.threads: 50
>     +pacemaker.thread.timeout: 10
>     +pacemaker.childopts: "-Xmx1024m"
>     +pacemaker.auth.method: "NONE"
>     +pacemaker.kerberos.users: []
>     +
>     +#default storm daemon metrics reporter plugins
>     +storm.daemon.metrics.reporter.plugins:
>     +     - "org.apache.storm.daemon.metrics.reporters.
> JmxPreparableReporter"
>
>     http://git-wip-us.apache.org/repos/asf/incubator-ranger/
> blob/7729373d/storm-agent/src/test/resources/words.txt
>     ----------------------------------------------------------------------
>     diff --git a/storm-agent/src/test/resources/words.txt
> b/storm-agent/src/test/resources/words.txt
>     new file mode 100644
>     index 0000000..c7725df
>     --- /dev/null
>     +++ b/storm-agent/src/test/resources/words.txt
>     @@ -0,0 +1,27 @@
>     +???# Licensed to the Apache Software Foundation (ASF) under one
>     +# or more contributor license agreements.  See the NOTICE file
>     +# distributed with this work for additional information
>     +# regarding copyright ownership.  The ASF licenses this file
>     +# to you under the Apache License, Version 2.0 (the
>     +# "License"); you may not use this file except in compliance
>     +# with the License.  You may obtain a copy of the License at
>     +#
>     +# http://www.apache.org/licenses/LICENSE-2.0
>     +#
>     +# Unless required by applicable law or agreed to in writing, software
>     +# distributed under the License is distributed on an "AS IS" BASIS,
>     +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>     +# See the License for the specific language governing permissions and
>     +# limitations under the License.
>     +the
>     +and
>     +storm
>     +random
>     +word
>     +spout
>     +test
>     +apache
>     +the
>     +spoon
>     +and
>     +the
>     \ No newline at end of file
>
>
>
>
>
>
>


-- 
Colm O hEigeartaigh

Talend Community Coder
http://coders.talend.com

Reply via email to