http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Constants.java 
b/storm-core/src/jvm/backtype/storm/Constants.java
deleted file mode 100644
index 35c252f..0000000
--- a/storm-core/src/jvm/backtype/storm/Constants.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm;
-
-import backtype.storm.coordination.CoordinatedBolt;
-import clojure.lang.RT;
-
-
-public class Constants {
-    public static final String COORDINATED_STREAM_ID = 
CoordinatedBolt.class.getName() + "/coord-stream"; 
-
-    public static final long SYSTEM_TASK_ID = -1;
-    public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
-    public static final String SYSTEM_COMPONENT_ID = "__system";
-    public static final String SYSTEM_TICK_STREAM_ID = "__tick";
-    public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
-    public static final String METRICS_STREAM_ID = "__metrics";
-    public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
-    public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
-}
-    

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ICredentialsListener.java 
b/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
deleted file mode 100644
index 1a7bc1b..0000000
--- a/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm;
-
-import java.util.Map;
-
-/**
- * Allows a bolt or a spout to be informed when the credentials of the 
topology have changed.
- */
-public interface ICredentialsListener {
-    /**
-     * Called when the credentials of a topology have changed.
-     * @param credentials the new credentials, could be null.
-     */
-    public void setCredentials(Map<String,String> credentials);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/ILocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ILocalCluster.java 
b/storm-core/src/jvm/backtype/storm/ILocalCluster.java
deleted file mode 100644
index 7d5aa35..0000000
--- a/storm-core/src/jvm/backtype/storm/ILocalCluster.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.generated.Credentials;
-
-import java.util.Map;
-
-
-public interface ILocalCluster {
-    void submitTopology(String topologyName, Map conf, StormTopology topology) 
throws AlreadyAliveException, InvalidTopologyException;
-    void submitTopologyWithOpts(String topologyName, Map conf, StormTopology 
topology, SubmitOptions submitOpts) throws AlreadyAliveException, 
InvalidTopologyException;
-    void uploadNewCredentials(String topologyName, Credentials creds);
-    void killTopology(String topologyName) throws NotAliveException;
-    void killTopologyWithOpts(String name, KillOptions options) throws 
NotAliveException;
-    void activate(String topologyName) throws NotAliveException;
-    void deactivate(String topologyName) throws NotAliveException;
-    void rebalance(String name, RebalanceOptions options) throws 
NotAliveException;
-    void shutdown();
-    String getTopologyConf(String id);
-    StormTopology getTopology(String id);
-    ClusterSummary getClusterInfo();
-    TopologyInfo getTopologyInfo(String id);
-    Map getState();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/ILocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ILocalDRPC.java 
b/storm-core/src/jvm/backtype/storm/ILocalDRPC.java
deleted file mode 100644
index e478dca..0000000
--- a/storm-core/src/jvm/backtype/storm/ILocalDRPC.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm;
-
-import backtype.storm.daemon.Shutdownable;
-import backtype.storm.generated.DistributedRPC;
-import backtype.storm.generated.DistributedRPCInvocations;
-
-
-public interface ILocalDRPC extends DistributedRPC.Iface, 
DistributedRPCInvocations.Iface, Shutdownable {
-    public String getServiceId();    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ISubmitterHook.java 
b/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
deleted file mode 100644
index 331c88f..0000000
--- a/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.TopologyInfo;
-
-import java.util.Map;
-
-/**
- * if FQCN of an implementation of this class is specified by setting the 
config storm.topology.submission.notifier.plugin.class,
- * that class's notify method will be invoked when a topology is successfully 
submitted via StormSubmitter class.
- */
-public interface ISubmitterHook {
-    public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology 
topology) throws IllegalAccessException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/LogWriter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/LogWriter.java 
b/storm-core/src/jvm/backtype/storm/LogWriter.java
deleted file mode 100644
index 849f5ca..0000000
--- a/storm-core/src/jvm/backtype/storm/LogWriter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.InputStream;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Launch a sub process and write files out to logs.
- */
-public class LogWriter extends Thread {
-    private Logger logger;
-    private BufferedReader in;
-
-    public LogWriter(InputStream in, Logger logger) {
-        this.in = new BufferedReader(new InputStreamReader(in));
-        this.logger = logger;
-    }
-
-    public void run() {
-        Logger logger = this.logger;
-        BufferedReader in = this.in;
-        String line;
-        try {
-            while ((line = in.readLine()) != null) {
-                logger.info(line);
-            }
-        } catch (IOException e) {
-            logger.error("Internal ERROR", e);
-        } finally {
-            try {
-                in.close();
-            } catch (IOException e) {
-                logger.error("Internal ERROR", e);
-            }
-        }
-    }
-
-    public void close() throws Exception {
-        this.join();
-    }
-
-    public static void main(String [] args) throws Exception {
-        ProcessBuilder pb = new ProcessBuilder(args);
-        Process p = pb.start();
-        LogWriter err = null;
-        LogWriter in = null;
-        int ret = -1;
-        try {
-            Logger logger = LoggerFactory.getLogger("STDERR");
-            err = new LogWriter(p.getErrorStream(), logger);
-            err.start();
-            in = new LogWriter(p.getInputStream(), logger);
-            in.start();
-            ret = p.waitFor();
-        } finally {
-          if (err != null) err.close();
-          if (in != null) in.close();
-        }
-        System.exit(ret);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java 
b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
deleted file mode 100644
index 725b0b1..0000000
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.HashMap;
-import java.util.Map;
-
-import backtype.storm.scheduler.resource.ResourceUtils;
-import backtype.storm.validation.ConfigValidation;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.security.auth.IAutoCredentials;
-import backtype.storm.security.auth.AuthUtils;
-import backtype.storm.generated.*;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Use this class to submit topologies to run on the Storm cluster. You should 
run your program
- * with the "storm jar" command from the command-line, and then use this class 
to
- * submit your topologies.
- */
-public class StormSubmitter {
-    public static final Logger LOG = 
LoggerFactory.getLogger(StormSubmitter.class);
-
-    private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
-
-    private static ILocalCluster localNimbus = null;
-
-    private static String generateZookeeperDigestSecretPayload() {
-        return Utils.secureRandomLong() + ":" + Utils.secureRandomLong();
-    }
-
-    public static final Pattern zkDigestPattern = Pattern.compile("\\S+:\\S+");
-
-    public static boolean validateZKDigestPayload(String payload) {
-        if (payload != null) {
-            Matcher m = zkDigestPattern.matcher(payload);
-            return m.matches();
-        }
-        return false;
-    }
-
-    @SuppressWarnings("unchecked")
-    public static Map prepareZookeeperAuthentication(Map conf) {
-        Map toRet = new HashMap();
-
-        // Is the topology ZooKeeper authentication configuration unset?
-        if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||
-                conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null 
||
-                !  validateZKDigestPayload((String)
-                    conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) {
-
-            String secretPayload = generateZookeeperDigestSecretPayload();
-            toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, 
secretPayload);
-            LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + 
secretPayload);
-        }
-
-        // This should always be set to digest.
-        toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
-
-        return toRet;
-    }
-
-    private static Map<String,String> populateCredentials(Map conf, 
Map<String, String> creds) {
-        Map<String,String> ret = new HashMap<>();
-        for (IAutoCredentials autoCred: AuthUtils.GetAutoCredentials(conf)) {
-            LOG.info("Running "+autoCred);
-            autoCred.populateCredentials(ret);
-        }
-        if (creds != null) {
-            ret.putAll(creds);
-        }
-        return ret;
-    }
-
-    /**
-     * Push a new set of credentials to the running topology.
-     * @param name the name of the topology to push credentials to.
-     * @param stormConf the topology-specific configuration, if desired. See 
{@link Config}.
-     * @param credentials the credentials to push.
-     * @throws AuthorizationException if you are not authorized ot push 
credentials.
-     * @throws NotAliveException if the topology is not alive
-     * @throws InvalidTopologyException if any other error happens
-     */
-    public static void pushCredentials(String name, Map stormConf, Map<String, 
String> credentials)
-            throws AuthorizationException, NotAliveException, 
InvalidTopologyException {
-        stormConf = new HashMap(stormConf);
-        stormConf.putAll(Utils.readCommandLineOpts());
-        Map conf = Utils.readStormConfig();
-        conf.putAll(stormConf);
-        Map<String,String> fullCreds = populateCredentials(conf, credentials);
-        if (fullCreds.isEmpty()) {
-            LOG.warn("No credentials were found to push to " + name);
-            return;
-        }
-        try {
-            if(localNimbus!=null) {
-                LOG.info("Pushing Credentials to topology " + name + " in 
local mode");
-                localNimbus.uploadNewCredentials(name, new 
Credentials(fullCreds));
-            } else {
-                NimbusClient client = NimbusClient.getConfiguredClient(conf);
-                try {
-                    LOG.info("Uploading new credentials to " +  name);
-                    client.getClient().uploadNewCredentials(name, new 
Credentials(fullCreds));
-                } finally {
-                    client.close();
-                }
-            }
-            LOG.info("Finished submitting topology: " +  name);
-        } catch(TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-
-    /**
-     * Submits a topology to run on the cluster. A topology runs forever or 
until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link 
Config}.
-     * @param topology the processing to execute.
-     * @throws AlreadyAliveException if a topology with this name is already 
running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     * @throws AuthorizationException if authorization is failed
-     */
-    public static void submitTopology(String name, Map stormConf, 
StormTopology topology)
-            throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException {
-        submitTopology(name, stormConf, topology, null, null);
-    }
-
-    /**
-     * Submits a topology to run on the cluster. A topology runs forever or 
until
-     * explicitly killed.
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link 
Config}.
-     * @param topology the processing to execute.
-     * @param opts to manipulate the starting of the topology.
-     * @throws AlreadyAliveException if a topology with this name is already 
running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     * @throws AuthorizationException if authorization is failed
-     */
-    public static void submitTopology(String name, Map stormConf, 
StormTopology topology, SubmitOptions opts)
-            throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException {
-        submitTopology(name, stormConf, topology, opts, null);
-    }
-
-    /**
-     * Submits a topology to run on the cluster as a particular user. A 
topology runs forever or until
-     * explicitly killed.
-     *
-     * @param name
-     * @param stormConf
-     * @param topology
-     * @param opts
-     * @param progressListener
-     * @param asUser The user as which this topology should be submitted.
-     * @throws AlreadyAliveException
-     * @throws InvalidTopologyException
-     * @throws AuthorizationException
-     * @throws IllegalArgumentException thrown if configs will yield an 
unschedulable topology. validateConfs validates confs
-     */
-    public static void submitTopologyAs(String name, Map stormConf, 
StormTopology topology, SubmitOptions opts, ProgressListener progressListener, 
String asUser)
-            throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException, IllegalArgumentException {
-        if(!Utils.isValidConf(stormConf)) {
-            throw new IllegalArgumentException("Storm conf is not valid. Must 
be json-serializable");
-        }
-        stormConf = new HashMap(stormConf);
-        stormConf.putAll(Utils.readCommandLineOpts());
-        Map conf = Utils.readStormConfig();
-        conf.putAll(stormConf);
-        stormConf.putAll(prepareZookeeperAuthentication(conf));
-
-        validateConfs(conf, topology);
-
-        Map<String,String> passedCreds = new HashMap<>();
-        if (opts != null) {
-            Credentials tmpCreds = opts.get_creds();
-            if (tmpCreds != null) {
-                passedCreds = tmpCreds.get_creds();
-            }
-        }
-        Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
-        if (!fullCreds.isEmpty()) {
-            if (opts == null) {
-                opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
-            }
-            opts.set_creds(new Credentials(fullCreds));
-        }
-        try {
-            if(localNimbus!=null) {
-                LOG.info("Submitting topology " + name + " in local mode");
-                if(opts!=null) {
-                    localNimbus.submitTopologyWithOpts(name, stormConf, 
topology, opts);
-                } else {
-                    // this is for backwards compatibility
-                    localNimbus.submitTopology(name, stormConf, topology);
-                }
-            } else {
-                String serConf = JSONValue.toJSONString(stormConf);
-                NimbusClient client = NimbusClient.getConfiguredClientAs(conf, 
asUser);
-                if(topologyNameExists(conf, name, asUser)) {
-                    throw new RuntimeException("Topology with name `" + name + 
"` already exists on cluster");
-                }
-                String jar = submitJarAs(conf, 
System.getProperty("storm.jar"), progressListener, asUser);
-                try {
-                    LOG.info("Submitting topology " +  name + " in distributed 
mode with conf " + serConf);
-                    if(opts!=null) {
-                        client.getClient().submitTopologyWithOpts(name, jar, 
serConf, topology, opts);
-                    } else {
-                        // this is for backwards compatibility
-                        client.getClient().submitTopology(name, jar, serConf, 
topology);
-                    }
-                } catch(InvalidTopologyException e) {
-                    LOG.warn("Topology submission exception: "+e.get_msg());
-                    throw e;
-                } catch(AlreadyAliveException e) {
-                    LOG.warn("Topology already alive exception", e);
-                    throw e;
-                } finally {
-                    client.close();
-                }
-            }
-            LOG.info("Finished submitting topology: " +  name);
-        } catch(TException e) {
-            throw new RuntimeException(e);
-        }
-        invokeSubmitterHook(name, asUser, conf, topology);
-
-    }
-
-    private static void invokeSubmitterHook(String name, String asUser, Map 
stormConf, StormTopology topology) {
-        try {
-            if 
(stormConf.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) {
-                ISubmitterHook submitterHook = (ISubmitterHook) 
Class.forName(stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString()).newInstance();
-                TopologyInfo topologyInfo = Utils.getTopologyInfo(name, 
asUser, stormConf);
-                submitterHook.notify(topologyInfo, stormConf, topology);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Submits a topology to run on the cluster. A topology runs forever or 
until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link 
Config}.
-     * @param topology the processing to execute.
-     * @param opts to manipulate the starting of the topology
-     * @param progressListener to track the progress of the jar upload process
-     * @throws AlreadyAliveException if a topology with this name is already 
running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     * @throws AuthorizationException if authorization is failed
-     */
-    @SuppressWarnings("unchecked")
-    public static void submitTopology(String name, Map stormConf, 
StormTopology topology, SubmitOptions opts,
-             ProgressListener progressListener) throws AlreadyAliveException, 
InvalidTopologyException, AuthorizationException {
-        submitTopologyAs(name, stormConf, topology, opts, progressListener, 
null);
-    }
-
-    /**
-     * Submits a topology to run on the cluster with a progress bar. A 
topology runs forever or until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link 
Config}.
-     * @param topology the processing to execute.
-     * @throws AlreadyAliveException if a topology with this name is already 
running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     * @throws AuthorizationException if authorization is failed
-     */
-
-    public static void submitTopologyWithProgressBar(String name, Map 
stormConf, StormTopology topology) throws AlreadyAliveException, 
InvalidTopologyException, AuthorizationException {
-        submitTopologyWithProgressBar(name, stormConf, topology, null);
-    }
-
-    /**
-     * Submits a topology to run on the cluster with a progress bar. A 
topology runs forever or until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link 
Config}.
-     * @param topology the processing to execute.
-     * @param opts to manipulate the starting of the topology
-     * @throws AlreadyAliveException if a topology with this name is already 
running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     * @throws AuthorizationException if authorization is failed
-     */
-
-    public static void submitTopologyWithProgressBar(String name, Map 
stormConf, StormTopology topology, SubmitOptions opts) throws 
AlreadyAliveException, InvalidTopologyException, AuthorizationException {
-        // show a progress bar so we know we're not stuck (especially on slow 
connections)
-        submitTopology(name, stormConf, topology, opts, new 
StormSubmitter.ProgressListener() {
-            @Override
-            public void onStart(String srcFile, String targetFile, long 
totalBytes) {
-                System.out.printf("Start uploading file '%s' to '%s' (%d 
bytes)\n", srcFile, targetFile, totalBytes);
-            }
-
-            @Override
-            public void onProgress(String srcFile, String targetFile, long 
bytesUploaded, long totalBytes) {
-                int length = 50;
-                int p = (int)((length * bytesUploaded) / totalBytes);
-                String progress = StringUtils.repeat("=", p);
-                String todo = StringUtils.repeat(" ", length - p);
-
-                System.out.printf("\r[%s%s] %d / %d", progress, todo, 
bytesUploaded, totalBytes);
-            }
-
-            @Override
-            public void onCompleted(String srcFile, String targetFile, long 
totalBytes) {
-                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", 
srcFile, targetFile, totalBytes);
-            }
-        });
-    }
-
-    private static boolean topologyNameExists(Map conf, String name, String 
asUser) {
-        NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
-        try {
-            ClusterSummary summary = client.getClient().getClusterInfo();
-            for(TopologySummary s : summary.get_topologies()) {
-                if(s.get_name().equals(name)) {
-                    return true;
-                }
-            }
-            return false;
-
-        } catch(Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            client.close();
-        }
-    }
-
-    private static String submitJar(Map conf, ProgressListener listener) {
-        return  submitJar(conf, System.getProperty("storm.jar"), listener);
-    }
-
-    /**
-     * Submit jar file
-     * @param conf the topology-specific configuration. See {@link Config}.
-     * @param localJar file path of the jar file to submit
-     * @return the remote location of the submitted jar
-     */
-    public static String submitJar(Map conf, String localJar) {
-        return submitJar(conf, localJar, null);
-    }
-
-
-    public static String submitJarAs(Map conf, String localJar, 
ProgressListener listener, String asUser) {
-        if (localJar == null) {
-            throw new RuntimeException("Must submit topologies using the 
'storm' client script so that StormSubmitter knows which jar to upload.");
-        }
-
-        NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
-        try {
-            String uploadLocation = client.getClient().beginFileUpload();
-            LOG.info("Uploading topology jar " + localJar + " to assigned 
location: " + uploadLocation);
-            BufferFileInputStream is = new BufferFileInputStream(localJar, 
THRIFT_CHUNK_SIZE_BYTES);
-
-            long totalSize = new File(localJar).length();
-            if (listener != null) {
-                listener.onStart(localJar, uploadLocation, totalSize);
-            }
-
-            long bytesUploaded = 0;
-            while(true) {
-                byte[] toSubmit = is.read();
-                bytesUploaded += toSubmit.length;
-                if (listener != null) {
-                    listener.onProgress(localJar, uploadLocation, 
bytesUploaded, totalSize);
-                }
-
-                if(toSubmit.length==0) break;
-                client.getClient().uploadChunk(uploadLocation, 
ByteBuffer.wrap(toSubmit));
-            }
-            client.getClient().finishFileUpload(uploadLocation);
-
-            if (listener != null) {
-                listener.onCompleted(localJar, uploadLocation, totalSize);
-            }
-
-            LOG.info("Successfully uploaded topology jar to assigned location: 
" + uploadLocation);
-            return uploadLocation;
-        } catch(Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            client.close();
-        }
-    }
-
-    /**
-     * Submit jar file
-     * @param conf the topology-specific configuration. See {@link Config}.
-     * @param localJar file path of the jar file to submit
-     * @param listener progress listener to track the jar file upload
-     * @return the remote location of the submitted jar
-     */
-    public static String submitJar(Map conf, String localJar, ProgressListener 
listener) {
-        return submitJarAs(conf,localJar, listener, null);
-    }
-
-    /**
-     * Interface use to track progress of file upload
-     */
-    public interface ProgressListener {
-        /**
-         * called before file is uploaded
-         * @param srcFile - jar file to be uploaded
-         * @param targetFile - destination file
-         * @param totalBytes - total number of bytes of the file
-         */
-        public void onStart(String srcFile, String targetFile, long 
totalBytes);
-
-        /**
-         * called whenever a chunk of bytes is uploaded
-         * @param srcFile - jar file to be uploaded
-         * @param targetFile - destination file
-         * @param bytesUploaded - number of bytes transferred so far
-         * @param totalBytes - total number of bytes of the file
-         */
-        public void onProgress(String srcFile, String targetFile, long 
bytesUploaded, long totalBytes);
-
-        /**
-         * called when the file is uploaded
-         * @param srcFile - jar file to be uploaded
-         * @param targetFile - destination file
-         * @param totalBytes - total number of bytes of the file
-         */
-        public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
-    }
-
-    private static void validateConfs(Map stormConf, StormTopology topology) 
throws IllegalArgumentException {
-        ConfigValidation.validateFields(stormConf);
-        validateTopologyWorkerMaxHeapSizeMBConfigs(stormConf, topology);
-    }
-
-    private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map 
stormConf, StormTopology topology) {
-        double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, 
stormConf);
-        Double topologyWorkerMaxHeapSize = 
Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
-        if(topologyWorkerMaxHeapSize < largestMemReq) {
-            throw new IllegalArgumentException("Topology will not be able to 
be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
-                    
+Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < "
-                    + largestMemReq + " (Largest memory requirement of a 
component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a 
larger amount");
-        }
-    }
-
-    private static double getMaxExecutorMemoryUsageForTopo(StormTopology 
topology, Map topologyConf) {
-        double largestMemoryOperator = 0.0;
-        for(Map<String, Double> entry : 
ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
-            double memoryRequirement = 
entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
-                    + 
entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
-            if(memoryRequirement > largestMemoryOperator) {
-                largestMemoryOperator = memoryRequirement;
-            }
-        }
-        for(Map<String, Double> entry : 
ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
-            double memoryRequirement = 
entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
-                    + 
entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
-            if(memoryRequirement > largestMemoryOperator) {
-                largestMemoryOperator = memoryRequirement;
-            }
-        }
-        return largestMemoryOperator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java 
b/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
deleted file mode 100644
index f35b7a7..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * An output stream where all of the data is committed on close,
- * or can be canceled with cancel.
- */
-public abstract class AtomicOutputStream extends OutputStream {
-    /**
-     * Cancel all of the writes associated with this stream and close it.
-     */ 
-    public abstract void cancel() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
deleted file mode 100644
index 53cfa15..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-public class BlobKeySequenceInfo {
-    private String nimbusHostPort;
-    private String sequenceNumber;
-
-    public void setNimbusHostPort(String nimbusHostPort) {
-     this.nimbusHostPort = nimbusHostPort;
-    }
-
-    public void setSequenceNumber(String sequenceNumber) {
-        this.sequenceNumber = sequenceNumber;
-    }
-
-    public String getNimbusHostPort() {
-        return nimbusHostPort;
-    }
-
-    public String getSequenceNumber() {
-        return sequenceNumber;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
deleted file mode 100644
index 16a408e..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import javax.security.auth.Subject;
-
-import backtype.storm.nimbus.NimbusInfo;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.daemon.Shutdownable;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.generated.KeyAlreadyExistsException;
-import backtype.storm.generated.ReadableBlobMeta;
-import backtype.storm.generated.SettableBlobMeta;
-
-/**
- * Provides a way to store blobs that can be downloaded.
- * Blobs must be able to be uploaded and listed from Nimbus,
- * and downloaded from the Supervisors. It is a key value based
- * store. Key being a string and value being the blob data.
- *
- * ACL checking must take place against the provided subject.
- * If the blob store does not support Security it must validate
- * that all ACLs set are always WORLD, everything.
- *
- * The users can upload their blobs through the blob store command
- * line. The command line also allows us to update and delete blobs.
- *
- * Modifying the replication factor only works for HdfsBlobStore
- * as for the LocalFsBlobStore the replication is dependent on
- * the number of Nimbodes available.
- */
-public abstract class BlobStore implements Shutdownable {
-    private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);
-    private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w 
\\t\\.:_-]+$");
-    protected static final String BASE_BLOBS_DIR_NAME = "blobs";
-
-    /**
-     * Allows us to initialize the blob store
-     * @param conf The storm configuration
-     * @param baseDir The directory path to store the blobs
-     * @param nimbusInfo Contains the nimbus host, port and leadership 
information.
-     */
-    public abstract void prepare(Map conf, String baseDir, NimbusInfo 
nimbusInfo);
-
-    /**
-     * Creates the blob.
-     * @param key Key for the blob.
-     * @param meta Metadata which contains the acls information
-     * @param who Is the subject creating the blob.
-     * @return AtomicOutputStream returns a stream into which the data
-     * can be written.
-     * @throws AuthorizationException
-     * @throws KeyAlreadyExistsException
-     */
-    public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta 
meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException;
-
-    /**
-     * Updates the blob data.
-     * @param key Key for the blob.
-     * @param who Is the subject having the write privilege for the blob.
-     * @return AtomicOutputStream returns a stream into which the data
-     * can be written.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract AtomicOutputStream updateBlob(String key, Subject who) 
throws AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Gets the current version of metadata for a blob
-     * to be viewed by the user or downloaded by the supervisor.
-     * @param key Key for the blob.
-     * @param who Is the subject having the read privilege for the blob.
-     * @return AtomicOutputStream returns a stream into which the data
-     * can be written.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) 
throws AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Sets the metadata with renewed acls for the blob.
-     * @param key Key for the blob.
-     * @param meta Metadata which contains the updated
-     * acls information.
-     * @param who Is the subject having the write privilege for the blob.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract void setBlobMeta(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Deletes the blob data and metadata.
-     * @param key Key for the blob.
-     * @param who Is the subject having write privilege for the blob.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract void deleteBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Gets the InputStream to read the blob details
-     * @param key Key for the blob.
-     * @param who Is the subject having the read privilege for the blob.
-     * @return InputStreamWithMeta has the additional
-     * file length and version information.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract InputStreamWithMeta getBlob(String key, Subject who) 
throws AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Returns an iterator with all the list of
-     * keys currently available on the blob store.
-     * @return Iterator<String>
-     */
-    public abstract Iterator<String> listKeys();
-
-    /**
-     * Gets the replication factor of the blob.
-     * @param key Key for the blob.
-     * @param who Is the subject having the read privilege for the blob.
-     * @return BlobReplication object containing the
-     * replication factor for the blob.
-     * @throws Exception
-     */
-    public abstract int getBlobReplication(String key, Subject who) throws 
Exception;
-
-    /**
-     * Modifies the replication factor of the blob.
-     * @param key Key for the blob.
-     * @param replication The replication factor the
-     * blob has to be set.
-     * @param who Is the subject having the update privilege for the blob
-     * @return BlobReplication object containing the
-     * updated replication factor for the blob.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     * @throws IOException
-     */
-    public abstract int updateBlobReplication(String key, int replication, 
Subject who) throws AuthorizationException, KeyNotFoundException, IOException;
-
-    /**
-     * Filters keys based on the KeyFilter
-     * passed as the argument.
-     * @param filter KeyFilter
-     * @param <R> Type
-     * @return Set of filtered keys
-     */
-    public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) {
-        Set<R> ret = new HashSet<R>();
-        Iterator<String> keys = listKeys();
-        while (keys.hasNext()) {
-            String key = keys.next();
-            R filtered = filter.filter(key);
-            if (filtered != null) {
-                ret.add(filtered);
-            }
-        }
-        return ret;
-    }
-
-    /**
-     * Validates key checking for potentially harmful patterns
-     * @param key Key for the blob.
-     */
-    public static final void validateKey(String key) throws 
AuthorizationException {
-        if (StringUtils.isEmpty(key) || "..".equals(key) || ".".equals(key) || 
!KEY_PATTERN.matcher(key).matches()) {
-            LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN);
-            throw new AuthorizationException(key+" does not appear to be a 
valid blob key");
-        }
-    }
-
-    /**
-     * Wrapper called to create the blob which contains
-     * the byte data
-     * @param key Key for the blob.
-     * @param data Byte data that needs to be uploaded.
-     * @param meta Metadata which contains the acls information
-     * @param who Is the subject creating the blob.
-     * @throws AuthorizationException
-     * @throws KeyAlreadyExistsException
-     * @throws IOException
-     */
-    public void createBlob(String key, byte [] data, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException, 
IOException {
-        AtomicOutputStream out = null;
-        try {
-            out = createBlob(key, meta, who);
-            out.write(data);
-            out.close();
-            out = null;
-        } finally {
-            if (out != null) {
-                out.cancel();
-            }
-        }
-    }
-
-    /**
-     * Wrapper called to create the blob which contains
-     * the byte data
-     * @param key Key for the blob.
-     * @param in InputStream from which the data is read to be
-     * written as a part of the blob.
-     * @param meta Metadata which contains the acls information
-     * @param who Is the subject creating the blob.
-     * @throws AuthorizationException
-     * @throws KeyAlreadyExistsException
-     * @throws IOException
-     */
-    public void createBlob(String key, InputStream in, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException, 
IOException {
-        AtomicOutputStream out = null;
-        try {
-            out = createBlob(key, meta, who);
-            byte[] buffer = new byte[2048];
-            int len = 0;
-            while ((len = in.read(buffer)) > 0) {
-                out.write(buffer, 0, len);
-            }
-            out.close();
-        } catch (AuthorizationException | IOException | RuntimeException e) {
-            if (out !=null) {
-                out.cancel();
-            }
-        } finally {
-            in.close();
-        }
-    }
-
-    /**
-     * Reads the blob from the blob store
-     * and writes it into the output stream.
-     * @param key Key for the blob.
-     * @param out Output stream
-     * @param who Is the subject having read
-     * privilege for the blob.
-     * @throws IOException
-     * @throws KeyNotFoundException
-     * @throws AuthorizationException
-     */
-    public void readBlobTo(String key, OutputStream out, Subject who) throws 
IOException, KeyNotFoundException, AuthorizationException {
-        InputStreamWithMeta in = getBlob(key, who);
-        if (in == null) {
-            throw new IOException("Could not find " + key);
-        }
-        byte[] buffer = new byte[2048];
-        int len = 0;
-        try{
-            while ((len = in.read(buffer)) > 0) {
-                out.write(buffer, 0, len);
-            }
-        } finally {
-            in.close();
-            out.flush();
-        }
-    }
-
-    /**
-     * Wrapper around readBlobTo which
-     * returns a ByteArray output stream.
-     * @param key  Key for the blob.
-     * @param who Is the subject having
-     * the read privilege for the blob.
-     * @return ByteArrayOutputStream
-     * @throws IOException
-     * @throws KeyNotFoundException
-     * @throws AuthorizationException
-     */
-    public byte[] readBlob(String key, Subject who) throws IOException, 
KeyNotFoundException, AuthorizationException {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        readBlobTo(key, out, who);
-        byte[] bytes = out.toByteArray();
-        out.close();
-        return bytes;
-    }
-
-    /**
-     * Output stream implementation used for reading the
-     * metadata and data information.
-     */
-    protected class BlobStoreFileOutputStream extends AtomicOutputStream {
-        private BlobStoreFile part;
-        private OutputStream out;
-
-        public BlobStoreFileOutputStream(BlobStoreFile part) throws 
IOException {
-            this.part = part;
-            this.out = part.getOutputStream();
-        }
-
-        @Override
-        public void close() throws IOException {
-            try {
-                //close means commit
-                out.close();
-                part.commit();
-            } catch (IOException | RuntimeException e) {
-                cancel();
-                throw e;
-            }
-        }
-
-        @Override
-        public void cancel() throws IOException {
-            try {
-                out.close();
-            } finally {
-                part.cancel();
-            }
-        }
-
-        @Override
-        public void write(int b) throws IOException {
-            out.write(b);
-        }
-
-        @Override
-        public void write(byte []b) throws IOException {
-            out.write(b);
-        }
-
-        @Override
-        public void write(byte []b, int offset, int len) throws IOException {
-            out.write(b, offset, len);
-        }
-    }
-
-    /**
-     * Input stream implementation used for writing
-     * both the metadata containing the acl information
-     * and the blob data.
-     */
-    protected class BlobStoreFileInputStream extends InputStreamWithMeta {
-        private BlobStoreFile part;
-        private InputStream in;
-
-        public BlobStoreFileInputStream(BlobStoreFile part) throws IOException 
{
-            this.part = part;
-            this.in = part.getInputStream();
-        }
-
-        @Override
-        public long getVersion() throws IOException {
-            return part.getModTime();
-        }
-
-        @Override
-        public int read() throws IOException {
-            return in.read();
-        }
-
-        @Override
-        public int read(byte[] b, int off, int len) throws IOException {
-            return in.read(b, off, len);
-        }
-
-        @Override
-        public int read(byte[] b) throws IOException {
-            return in.read(b);
-        }
-
-        @Override
-        public int available() throws IOException {
-            return in.available();
-        }
-
-        @Override
-        public long getFileLength() throws IOException {
-            return part.getFileLength();
-        }
-    }
-
-    /**
-     * Blob store implements its own version of iterator
-     * to list the blobs
-     */
-    public static class KeyTranslationIterator implements Iterator<String> {
-        private Iterator<String> it = null;
-        private String next = null;
-        private String prefix = null;
-
-        public KeyTranslationIterator(Iterator<String> it, String prefix) 
throws IOException {
-            this.it = it;
-            this.prefix = prefix;
-            primeNext();
-        }
-
-        private void primeNext() {
-            next = null;
-            while (it.hasNext()) {
-                String tmp = it.next();
-                if (tmp.startsWith(prefix)) {
-                    next = tmp.substring(prefix.length());
-                    return;
-                }
-            }
-        }
-
-        @Override
-        public boolean hasNext() {
-            return next != null;
-        }
-
-        @Override
-        public String next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            String current = next;
-            primeNext();
-            return current;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("Delete Not Supported");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
deleted file mode 100644
index c0c4e5c..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AccessControl;
-import backtype.storm.generated.AccessControlType;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.SettableBlobMeta;
-import backtype.storm.security.auth.AuthUtils;
-import backtype.storm.security.auth.IPrincipalToLocal;
-import backtype.storm.security.auth.NimbusPrincipal;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Provides common handling of acls for Blobstores.
- * Also contains some static utility functions related to Blobstores.
- */
-public class BlobStoreAclHandler {
-    public static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreAclHandler.class);
-    private final IPrincipalToLocal _ptol;
-
-    public static final int READ = 0x01;
-    public static final int WRITE = 0x02;
-    public static final int ADMIN = 0x04;
-    public static final List<AccessControl> WORLD_EVERYTHING =
-            Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | 
WRITE | ADMIN));
-    public static final List<AccessControl> DEFAULT = new 
ArrayList<AccessControl>();
-    private Set<String> _supervisors;
-    private Set<String> _admins;
-
-    public BlobStoreAclHandler(Map conf) {
-        _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
-        _supervisors = new HashSet<String>();
-        _admins = new HashSet<String>();
-        if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
-            
_supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
-        }
-        if (conf.containsKey(Config.NIMBUS_ADMINS)) {
-            _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS));
-        }
-    }
-
-    private static AccessControlType parseACLType(String type) {
-        if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
-            return AccessControlType.OTHER;
-        } else if ("user".equalsIgnoreCase(type) || 
"u".equalsIgnoreCase(type)) {
-            return AccessControlType.USER;
-        }
-        throw new IllegalArgumentException(type+" is not a valid access 
control type");
-    }
-
-    private static int parseAccess(String access) {
-        int ret = 0;
-        for (char c: access.toCharArray()) {
-            if ('r' == c) {
-                ret = ret | READ;
-            } else if ('w' == c) {
-                ret = ret | WRITE;
-            } else if ('a' == c) {
-                ret = ret | ADMIN;
-            } else if ('-' == c) {
-                //ignored
-            } else {
-                throw new IllegalArgumentException("");
-            }
-        }
-        return ret;
-    }
-
-    public static AccessControl parseAccessControl(String str) {
-        String[] parts = str.split(":");
-        String type = "other";
-        String name = "";
-        String access = "-";
-        if (parts.length > 3) {
-            throw new IllegalArgumentException("Don't know how to parse 
"+str+" into an ACL value");
-        } else if (parts.length == 1) {
-            type = "other";
-            name = "";
-            access = parts[0];
-        } else if (parts.length == 2) {
-            type = "user";
-            name = parts[0];
-            access = parts[1];
-        } else if (parts.length == 3) {
-            type = parts[0];
-            name = parts[1];
-            access = parts[2];
-        }
-        AccessControl ret = new AccessControl();
-        ret.set_type(parseACLType(type));
-        ret.set_name(name);
-        ret.set_access(parseAccess(access));
-        return ret;
-    }
-
-    private static String accessToString(int access) {
-        StringBuilder ret = new StringBuilder();
-        ret.append(((access & READ) > 0) ? "r" : "-");
-        ret.append(((access & WRITE) > 0) ? "w" : "-");
-        ret.append(((access & ADMIN) > 0) ? "a" : "-");
-        return ret.toString();
-    }
-
-    public static String accessControlToString(AccessControl ac) {
-        StringBuilder ret = new StringBuilder();
-        switch(ac.get_type()) {
-            case OTHER:
-                ret.append("o");
-                break;
-            case USER:
-                ret.append("u");
-                break;
-            default:
-                throw new IllegalArgumentException("Don't know what a type of 
"+ac.get_type()+" means ");
-        }
-        ret.append(":");
-        if (ac.is_set_name()) {
-            ret.append(ac.get_name());
-        }
-        ret.append(":");
-        ret.append(accessToString(ac.get_access()));
-        return ret.toString();
-    }
-
-    public static void validateSettableACLs(String key, List<AccessControl> 
acls) throws AuthorizationException {
-        Set<String> aclUsers = new HashSet<>();
-        List<String> duplicateUsers = new ArrayList<>();
-        for (AccessControl acl : acls) {
-            String aclUser = acl.get_name();
-            if (!StringUtils.isEmpty(aclUser) && !aclUsers.add(aclUser)) {
-                LOG.error("'{}' user can't appear more than once in the ACLs", 
aclUser);
-                duplicateUsers.add(aclUser);
-            }
-        }
-        if (duplicateUsers.size() > 0) {
-            String errorMessage  = "user " + 
Arrays.toString(duplicateUsers.toArray())
-                    + " can't appear more than once in the ACLs for key [" + 
key +"].";
-            throw new AuthorizationException(errorMessage);
-        }
-    }
-
-    private Set<String> constructUserFromPrincipals(Subject who) {
-        Set<String> user = new HashSet<String>();
-        if (who != null) {
-            for (Principal p : who.getPrincipals()) {
-                user.add(_ptol.toLocal(p));
-            }
-        }
-        return user;
-    }
-
-    private boolean isAdmin(Subject who) {
-        Set<String> user = constructUserFromPrincipals(who);
-        for (String u : user) {
-            if (_admins.contains(u)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private boolean isReadOperation(int operation) {
-        if (operation == 1) {
-            return true;
-        }
-        return false;
-    }
-
-    private boolean isSupervisor(Subject who, int operation) {
-        Set<String> user = constructUserFromPrincipals(who);
-        if (isReadOperation(operation)) {
-            for (String u : user) {
-                if (_supervisors.contains(u)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    private boolean isNimbus(Subject who) {
-        Set<Principal> principals;
-        boolean isNimbusInstance = false;
-        if (who != null) {
-            principals = who.getPrincipals();
-            for (Principal principal : principals) {
-                if (principal instanceof NimbusPrincipal) {
-                    isNimbusInstance = true;
-                }
-            }
-        }
-        return isNimbusInstance;
-    }
-
-    public boolean checkForValidUsers(Subject who, int mask) {
-        return isNimbus(who) || isAdmin(who) || isSupervisor(who,mask);
-    }
-
-    /**
-     * The user should be able to see the metadata if and only if they have 
any of READ, WRITE, or ADMIN
-     */
-    public void validateUserCanReadMeta(List<AccessControl> acl, Subject who, 
String key) throws AuthorizationException {
-        hasAnyPermissions(acl, (READ|WRITE|ADMIN), who, key);
-    }
-
-    /**
-     * Validates if the user has any of the permissions
-     * mentioned in the mask.
-     * @param acl ACL for the key.
-     * @param mask mask holds the cumulative value of
-     * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
-     * mask = 1 implies READ privilege.
-     * mask = 5 implies READ and ADMIN privileges.
-     * @param who Is the user against whom the permissions
-     * are validated for a key using the ACL and the mask.
-     * @param key Key used to identify the blob.
-     * @throws AuthorizationException
-     */
-    public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject 
who, String key) throws AuthorizationException {
-        Set<String> user = constructUserFromPrincipals(who);
-        LOG.debug("user {}", user);
-        if (checkForValidUsers(who, mask)) {
-            return;
-        }
-        for (AccessControl ac : acl) {
-            int allowed = getAllowed(ac, user);
-            LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key);
-            if ((allowed & mask) > 0) {
-                return;
-            }
-        }
-        throw new AuthorizationException(
-                user + " does not have access to " + key);
-    }
-
-    /**
-     * Validates if the user has at least the set of permissions
-     * mentioned in the mask.
-     * @param acl ACL for the key.
-     * @param mask mask holds the cumulative value of
-     * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
-     * mask = 1 implies READ privilege.
-     * mask = 5 implies READ and ADMIN privileges.
-     * @param who Is the user against whom the permissions
-     * are validated for a key using the ACL and the mask.
-     * @param key Key used to identify the blob.
-     * @throws AuthorizationException
-     */
-    public void hasPermissions(List<AccessControl> acl, int mask, Subject who, 
String key) throws AuthorizationException {
-        Set<String> user = constructUserFromPrincipals(who);
-        LOG.debug("user {}", user);
-        if (checkForValidUsers(who, mask)) {
-            return;
-        }
-        for (AccessControl ac : acl) {
-            int allowed = getAllowed(ac, user);
-            mask = ~allowed & mask;
-            LOG.debug(" user: {} allowed: {} disallowed: {} key: {}", user, 
allowed, mask, key);
-        }
-        if (mask == 0) {
-            return;
-        }
-        throw new AuthorizationException(
-                user + " does not have " + namedPerms(mask) + " access to " + 
key);
-    }
-
-    public void normalizeSettableBlobMeta(String key, SettableBlobMeta meta, 
Subject who, int opMask) {
-        meta.set_acl(normalizeSettableACLs(key, meta.get_acl(), who, opMask));
-    }
-
-    private String namedPerms(int mask) {
-        StringBuilder b = new StringBuilder();
-        b.append("[");
-        if ((mask & READ) > 0) {
-            b.append("READ ");
-        }
-        if ((mask & WRITE) > 0) {
-            b.append("WRITE ");
-        }
-        if ((mask & ADMIN) > 0) {
-            b.append("ADMIN ");
-        }
-        b.append("]");
-        return b.toString();
-    }
-
-    private int getAllowed(AccessControl ac, Set<String> users) {
-        switch (ac.get_type()) {
-            case OTHER:
-                return ac.get_access();
-            case USER:
-                if (users.contains(ac.get_name())) {
-                    return ac.get_access();
-                }
-                return 0;
-            default:
-                return 0;
-        }
-    }
-
-    private List<AccessControl> removeBadACLs(List<AccessControl> 
accessControls) {
-        List<AccessControl> resultAcl = new ArrayList<AccessControl>();
-        for (AccessControl control : accessControls) {
-            if(control.get_type().equals(AccessControlType.OTHER) && 
(control.get_access() == 0 )) {
-                LOG.debug("Removing invalid blobstore world ACL " +
-                        BlobStoreAclHandler.accessControlToString(control));
-                continue;
-            }
-            resultAcl.add(control);
-        }
-        return resultAcl;
-    }
-
-    private final List<AccessControl> normalizeSettableACLs(String key, 
List<AccessControl> acls, Subject who,
-                                                            int opMask) {
-        List<AccessControl> cleanAcls = removeBadACLs(acls);
-        Set<String> userNames = getUserNamesFromSubject(who);
-        for (String user : userNames) {
-            fixACLsForUser(cleanAcls, user, opMask);
-        }
-        if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) {
-            cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING);
-            LOG.debug("Access Control for key {} is normalized to world 
everything {}", key, cleanAcls);
-            if (!acls.isEmpty())
-                LOG.warn("Access control for blob with key {} is normalized to 
WORLD_EVERYTHING", key);
-        }
-        return cleanAcls;
-    }
-
-    private boolean worldEverything(List<AccessControl> acls) {
-        boolean isWorldEverything = false;
-        for (AccessControl acl : acls) {
-            if (acl.get_type() == AccessControlType.OTHER && acl.get_access() 
== (READ|WRITE|ADMIN)) {
-                isWorldEverything = true;
-                break;
-            }
-        }
-        return isWorldEverything;
-    }
-
-    private void fixACLsForUser(List<AccessControl> acls, String user, int 
mask) {
-        boolean foundUserACL = false;
-        for (AccessControl control : acls) {
-            if (control.get_type() == AccessControlType.USER && 
control.get_name().equals(user)) {
-                int currentAccess = control.get_access();
-                if ((currentAccess & mask) != mask) {
-                    control.set_access(currentAccess | mask);
-                }
-                foundUserACL = true;
-                break;
-            }
-        }
-        if (!foundUserACL) {
-            AccessControl userACL = new AccessControl();
-            userACL.set_type(AccessControlType.USER);
-            userACL.set_name(user);
-            userACL.set_access(mask);
-            acls.add(userACL);
-        }
-    }
-
-    private Set<String> getUserNamesFromSubject(Subject who) {
-        Set<String> user = new HashSet<String>();
-        if (who != null) {
-            for(Principal p: who.getPrincipals()) {
-                user.add(_ptol.toLocal(p));
-            }
-        }
-        return user;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
deleted file mode 100644
index 22ccf97..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.generated.SettableBlobMeta;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.regex.Pattern;
-
-/**
- * Provides an base implementation for creating a blobstore based on file 
backed storage.
- */
-public abstract class BlobStoreFile {
-    public static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreFile.class);
-
-    protected static final String TMP_EXT = ".tmp";
-    protected static final Pattern TMP_NAME_PATTERN = 
Pattern.compile("^\\d+\\" + TMP_EXT + "$");
-    protected static final String BLOBSTORE_DATA_FILE = "data";
-
-    public abstract void delete() throws IOException;
-    public abstract String getKey();
-    public abstract boolean isTmp();
-    public abstract void setMetadata(SettableBlobMeta meta);
-    public abstract SettableBlobMeta getMetadata();
-    public abstract long getModTime() throws IOException;
-    public abstract InputStream getInputStream() throws IOException;
-    public abstract OutputStream getOutputStream() throws IOException;
-    public abstract void commit() throws IOException;
-    public abstract void cancel() throws IOException;
-    public abstract long getFileLength() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java
deleted file mode 100644
index 97fb262..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.KeyAlreadyExistsException;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.generated.ReadableBlobMeta;
-import backtype.storm.nimbus.NimbusInfo;
-import backtype.storm.security.auth.NimbusPrincipal;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import backtype.storm.utils.ZookeeperAuthInfo;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class BlobStoreUtils {
-    private static final String BLOBSTORE_SUBTREE="/blobstore";
-    private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreUtils.class);
-
-    public static CuratorFramework createZKClient(Map conf) {
-        List<String> zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
-        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
-        ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
-        CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, 
(String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo);
-        zkClient.start();
-        return zkClient;
-    }
-
-    public static Subject getNimbusSubject() {
-        Subject subject = new Subject();
-        subject.getPrincipals().add(new NimbusPrincipal());
-        return subject;
-    }
-
-    // Normalize state
-    public static BlobKeySequenceInfo 
normalizeNimbusHostPortSequenceNumberInfo(String nimbusSeqNumberInfo) {
-        BlobKeySequenceInfo keySequenceInfo = new BlobKeySequenceInfo();
-        int lastIndex = nimbusSeqNumberInfo.lastIndexOf("-");
-        keySequenceInfo.setNimbusHostPort(nimbusSeqNumberInfo.substring(0, 
lastIndex));
-        
keySequenceInfo.setSequenceNumber(nimbusSeqNumberInfo.substring(lastIndex + 1));
-        return keySequenceInfo;
-    }
-
-    // Check for latest sequence number of a key inside zookeeper and return 
nimbodes containing the latest sequence number
-    public static Set<NimbusInfo> 
getNimbodesWithLatestSequenceNumberOfBlob(CuratorFramework zkClient, String 
key) throws Exception {
-        List<String> stateInfoList = 
zkClient.getChildren().forPath("/blobstore/" + key);
-        Set<NimbusInfo> nimbusInfoSet = new HashSet<NimbusInfo>();
-        int latestSeqNumber = getLatestSequenceNumber(stateInfoList);
-        LOG.debug("getNimbodesWithLatestSequenceNumberOfBlob stateInfo {} 
version {}", stateInfoList, latestSeqNumber);
-        // Get the nimbodes with the latest version
-        for(String state : stateInfoList) {
-            BlobKeySequenceInfo sequenceInfo = 
normalizeNimbusHostPortSequenceNumberInfo(state);
-            if (latestSeqNumber == 
Integer.parseInt(sequenceInfo.getSequenceNumber())) {
-                
nimbusInfoSet.add(NimbusInfo.parse(sequenceInfo.getNimbusHostPort()));
-            }
-        }
-        LOG.debug("nimbusInfoList {}", nimbusInfoSet);
-        return nimbusInfoSet;
-    }
-
-    // Get sequence number details from latest sequence number of the blob
-    public static int getLatestSequenceNumber(List<String> stateInfoList) {
-        int seqNumber = 0;
-        // Get latest sequence number of the blob present in the zookeeper --> 
possible to refactor this piece of code
-        for (String state : stateInfoList) {
-            BlobKeySequenceInfo sequenceInfo = 
normalizeNimbusHostPortSequenceNumberInfo(state);
-            int currentSeqNumber = 
Integer.parseInt(sequenceInfo.getSequenceNumber());
-            if (seqNumber < currentSeqNumber) {
-                seqNumber = currentSeqNumber;
-                LOG.debug("Sequence Info {}", seqNumber);
-            }
-        }
-        LOG.debug("Latest Sequence Number {}", seqNumber);
-        return seqNumber;
-    }
-
-    // Download missing blobs from potential nimbodes
-    public static boolean downloadMissingBlob(Map conf, BlobStore blobStore, 
String key, Set<NimbusInfo> nimbusInfos)
-            throws TTransportException {
-        NimbusClient client;
-        ReadableBlobMeta rbm;
-        ClientBlobStore remoteBlobStore;
-        InputStreamWithMeta in;
-        boolean isSuccess = false;
-        LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
-        for (NimbusInfo nimbusInfo : nimbusInfos) {
-            if(isSuccess) {
-                break;
-            }
-            try {
-                client = new NimbusClient(conf, nimbusInfo.getHost(), 
nimbusInfo.getPort(), null);
-                rbm = client.getClient().getBlobMeta(key);
-                remoteBlobStore = new NimbusBlobStore();
-                remoteBlobStore.setClient(conf, client);
-                in = remoteBlobStore.getBlob(key);
-                blobStore.createBlob(key, in, rbm.get_settable(), 
getNimbusSubject());
-                // if key already exists while creating the blob else update it
-                Iterator<String> keyIterator = blobStore.listKeys();
-                while (keyIterator.hasNext()) {
-                    if (keyIterator.next().equals(key)) {
-                        LOG.debug("Success creating key, {}", key);
-                        isSuccess = true;
-                        break;
-                    }
-                }
-            } catch (IOException | AuthorizationException exception) {
-                throw new RuntimeException(exception);
-            } catch (KeyAlreadyExistsException kae) {
-                LOG.info("KeyAlreadyExistsException Key: {} {}", key, kae);
-            } catch (KeyNotFoundException knf) {
-                // Catching and logging KeyNotFoundException because, if
-                // there is a subsequent update and delete, the non-leader
-                // nimbodes might throw an exception.
-                LOG.info("KeyNotFoundException Key: {} {}", key, knf);
-            } catch (Exception exp) {
-                // Logging an exception while client is connecting
-                LOG.error("Exception {}", exp);
-            }
-        }
-
-        if (!isSuccess) {
-            LOG.error("Could not download blob with key" + key);
-        }
-        return isSuccess;
-    }
-
-    // Download updated blobs from potential nimbodes
-    public static boolean downloadUpdatedBlob(Map conf, BlobStore blobStore, 
String key, Set<NimbusInfo> nimbusInfos)
-            throws TTransportException {
-        NimbusClient client;
-        ClientBlobStore remoteBlobStore;
-        InputStreamWithMeta in;
-        AtomicOutputStream out;
-        boolean isSuccess = false;
-        LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
-        for (NimbusInfo nimbusInfo : nimbusInfos) {
-            if (isSuccess) {
-                break;
-            }
-            try {
-                client = new NimbusClient(conf, nimbusInfo.getHost(), 
nimbusInfo.getPort(), null);
-                remoteBlobStore = new NimbusBlobStore();
-                remoteBlobStore.setClient(conf, client);
-                in = remoteBlobStore.getBlob(key);
-                out = blobStore.updateBlob(key, getNimbusSubject());
-                byte[] buffer = new byte[2048];
-                int len = 0;
-                while ((len = in.read(buffer)) > 0) {
-                    out.write(buffer, 0, len);
-                }
-                if (out != null) {
-                    out.close();
-                }
-                isSuccess = true;
-            } catch (IOException | AuthorizationException exception) {
-                throw new RuntimeException(exception);
-            } catch (KeyNotFoundException knf) {
-                // Catching and logging KeyNotFoundException because, if
-                // there is a subsequent update and delete, the non-leader
-                // nimbodes might throw an exception.
-                LOG.info("KeyNotFoundException {}", knf);
-            } catch (Exception exp) {
-                // Logging an exception while client is connecting
-                LOG.error("Exception {}", exp);
-            }
-        }
-
-        if (!isSuccess) {
-            LOG.error("Could not update the blob with key" + key);
-        }
-        return isSuccess;
-    }
-
-    // Get the list of keys from blobstore
-    public static List<String> getKeyListFromBlobStore(BlobStore blobStore) 
throws Exception {
-        Iterator<String> keys = blobStore.listKeys();
-        List<String> keyList = new ArrayList<String>();
-        if (keys != null) {
-            while (keys.hasNext()) {
-                keyList.add(keys.next());
-            }
-        }
-        LOG.debug("KeyList from blobstore {}", keyList);
-        return keyList;
-    }
-
-    public static void createStateInZookeeper(Map conf, String key, NimbusInfo 
nimbusInfo) throws TTransportException {
-        ClientBlobStore cb = new NimbusBlobStore();
-        cb.setClient(conf, new NimbusClient(conf, nimbusInfo.getHost(), 
nimbusInfo.getPort(), null));
-        cb.createStateInZookeeper(key);
-    }
-
-    public static void updateKeyForBlobStore (Map conf, BlobStore blobStore, 
CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) {
-        try {
-            // Most of clojure tests currently try to access the blobs using 
getBlob. Since, updateKeyForBlobStore
-            // checks for updating the correct version of the blob as a part 
of nimbus ha before performing any
-            // operation on it, there is a neccessity to stub several test 
cases to ignore this method. It is a valid
-            // trade off to return if nimbusDetails which include the details 
of the current nimbus host port data are
-            // not initialized as a part of the test. Moreover, this applies 
to only local blobstore when used along with
-            // nimbus ha.
-            if (nimbusDetails == null) {
-                return;
-            }
-            boolean isListContainsCurrentNimbusInfo = false;
-            List<String> stateInfo;
-            if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) 
== null) {
-                return;
-            }
-            stateInfo = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" 
+ key);
-            LOG.debug("StateInfo for update {}", stateInfo);
-            Set<NimbusInfo> nimbusInfoList = 
getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
-
-            for (NimbusInfo nimbusInfo:nimbusInfoList) {
-                if (nimbusInfo.getHost().equals(nimbusDetails.getHost())) {
-                    isListContainsCurrentNimbusInfo = true;
-                    break;
-                }
-            }
-
-            if (!isListContainsCurrentNimbusInfo && downloadUpdatedBlob(conf, 
blobStore, key, nimbusInfoList)) {
-                LOG.debug("Updating state inside zookeeper for an update");
-                createStateInZookeeper(conf, key, nimbusDetails);
-            }
-        } catch (Exception exp) {
-            throw new RuntimeException(exp);
-        }
-    }
-
-}

Reply via email to