http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
deleted file mode 100644
index 1f20d7c..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.nimbus.NimbusInfo;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Is called periodically and updates the nimbus with blobs based on the state 
stored inside the zookeeper
- * for a non leader nimbus trying to be in sync with the operations performed 
on the leader nimbus.
- */
-public class BlobSynchronizer {
-    private static final Logger LOG = 
LoggerFactory.getLogger(BlobSynchronizer.class);
-    private CuratorFramework zkClient;
-    private Map conf;
-    private BlobStore blobStore;
-    private Set<String> blobStoreKeySet = new HashSet<String>();
-    private Set<String> zookeeperKeySet = new HashSet<String>();
-    private NimbusInfo nimbusInfo;
-
-    public BlobSynchronizer(BlobStore blobStore, Map conf) {
-        this.blobStore = blobStore;
-        this.conf = conf;
-    }
-
-    public void setNimbusInfo(NimbusInfo nimbusInfo) {
-        this.nimbusInfo = nimbusInfo;
-    }
-
-    public void setZookeeperKeySet(Set<String> zookeeperKeySet) {
-        this.zookeeperKeySet = zookeeperKeySet;
-    }
-
-    public void setBlobStoreKeySet(Set<String> blobStoreKeySet) {
-        this.blobStoreKeySet = blobStoreKeySet;
-    }
-
-    public Set<String> getBlobStoreKeySet() {
-        Set<String> keySet = new HashSet<String>();
-        keySet.addAll(blobStoreKeySet);
-        return keySet;
-    }
-
-    public Set<String> getZookeeperKeySet() {
-        Set<String> keySet = new HashSet<String>();
-        keySet.addAll(zookeeperKeySet);
-        return keySet;
-    }
-
-    public synchronized void syncBlobs() {
-        try {
-            LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys 
{}",getBlobStoreKeySet(), getZookeeperKeySet());
-            zkClient = BlobStoreUtils.createZKClient(conf);
-            deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), 
getZookeeperKeySet());
-            updateKeySetForBlobStore(getBlobStoreKeySet());
-            Set<String> keySetToDownload = 
getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
-            LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> 
{}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
-
-            for (String key : keySetToDownload) {
-                Set<NimbusInfo> nimbusInfoSet = 
BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
-                if(BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, 
nimbusInfoSet)) {
-                    BlobStoreUtils.createStateInZookeeper(conf, key, 
nimbusInfo);
-                }
-            }
-            if (zkClient !=null) {
-                zkClient.close();
-            }
-        } catch(InterruptedException exp) {
-            LOG.error("InterruptedException {}", exp);
-        } catch(Exception exp) {
-            throw new RuntimeException(exp);
-        }
-    }
-
-    public void deleteKeySetFromBlobStoreNotOnZookeeper(Set<String> 
keySetBlobStore, Set<String> keySetZookeeper) throws Exception {
-        if (keySetBlobStore.removeAll(keySetZookeeper)
-                || (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) {
-            LOG.debug("Key set to delete in blobstore {}", keySetBlobStore);
-            for (String key : keySetBlobStore) {
-                blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject());
-            }
-        }
-    }
-
-    // Update current key list inside the blobstore if the version changes
-    public void updateKeySetForBlobStore(Set<String> keySetBlobStore) {
-        try {
-            for (String key : keySetBlobStore) {
-                LOG.debug("updating blob");
-                BlobStoreUtils.updateKeyForBlobStore(conf, blobStore, 
zkClient, key, nimbusInfo);
-            }
-        } catch (Exception exp) {
-            throw new RuntimeException(exp);
-        }
-    }
-
-    // Make a key list to download
-    public Set<String> getKeySetToDownload(Set<String> blobStoreKeySet, 
Set<String> zookeeperKeySet) {
-        zookeeperKeySet.removeAll(blobStoreKeySet);
-        LOG.debug("Key list to download {}", zookeeperKeySet);
-        return zookeeperKeySet;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java 
b/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java
deleted file mode 100644
index 6408469..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.daemon.Shutdownable;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.ReadableBlobMeta;
-import backtype.storm.generated.SettableBlobMeta;
-import backtype.storm.generated.KeyAlreadyExistsException;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.utils.NimbusClient;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * The ClientBlobStore has two concrete implementations
- * 1. NimbusBlobStore
- * 2. HdfsClientBlobStore
- *
- * Create, update, read and delete are some of the basic operations defined by 
this interface.
- * Each operation is validated for permissions against an user. We currently 
have NIMBUS_ADMINS and SUPERVISOR_ADMINS
- * configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN access whereas 
the SUPERVISOR_ADMINS are given READ
- * access in order to read and download the blobs form the nimbus.
- *
- * The ACLs for the blob store are validated against whether the subject is a 
NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
- * who has read, write or admin privileges in order to perform respective 
operations on the blob.
- *
- * For more detailed implementation
- * @see backtype.storm.blobstore.NimbusBlobStore
- * @see backtype.storm.blobstore.LocalFsBlobStore
- * @see org.apache.storm.hdfs.blobstore.HdfsClientBlobStore
- * @see org.apache.storm.hdfs.blobstore.HdfsBlobStore
- */
-public abstract class ClientBlobStore implements Shutdownable {
-    protected Map conf;
-
-    /**
-     * Sets up the client API by parsing the configs.
-     * @param conf The storm conf containing the config details.
-     */
-    public abstract void prepare(Map conf);
-
-    /**
-     * Client facing API to create a blob.
-     * @param key blob key name.
-     * @param meta contains ACL information.
-     * @return AtomicOutputStream returns an output stream into which data can 
be written.
-     * @throws AuthorizationException
-     * @throws KeyAlreadyExistsException
-     */
-    protected abstract AtomicOutputStream createBlobToExtend(String key, 
SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException;
-
-    /**
-     * Client facing API to update a blob.
-     * @param key blob key name.
-     * @return AtomicOutputStream returns an output stream into which data can 
be written.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract AtomicOutputStream updateBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Client facing API to read the metadata information.
-     * @param key blob key name.
-     * @return AtomicOutputStream returns an output stream into which data can 
be written.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract ReadableBlobMeta getBlobMeta(String key) throws 
AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Client facing API to set the metadata for a blob.
-     * @param key blob key name.
-     * @param meta contains ACL information.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    protected abstract void setBlobMetaToExtend(String key, SettableBlobMeta 
meta) throws AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Client facing API to delete a blob.
-     * @param key blob key name.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract void deleteBlob(String key) throws AuthorizationException, 
KeyNotFoundException;
-
-    /**
-     * Client facing API to read a blob.
-     * @param key blob key name.
-     * @return an InputStream to read the metadata for a blob.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract InputStreamWithMeta getBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
-
-    /**
-     * @return Iterator for a list of keys currently present in the blob store.
-     */
-    public abstract Iterator<String> listKeys();
-
-    /**
-     * Client facing API to read the replication of a blob.
-     * @param key blob key name.
-     * @return int indicates the replication factor of a blob.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract int getBlobReplication(String key) throws 
AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Client facing API to update the replication of a blob.
-     * @param key blob key name.
-     * @param replication int indicates the replication factor a blob has to 
be set.
-     * @return int indicates the replication factor of a blob.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public abstract int updateBlobReplication(String key, int replication) 
throws AuthorizationException, KeyNotFoundException;
-
-    /**
-     * Client facing API to set a nimbus client.
-     * @param conf storm conf
-     * @param client NimbusClient
-     * @return indicates where the client connection has been setup.
-     */
-    public abstract boolean setClient(Map conf, NimbusClient client);
-
-    /**
-     * Creates state inside a zookeeper.
-     * Required for blobstore to write to zookeeper
-     * when Nimbus HA is turned on in order to maintain
-     * state consistency
-     * @param key
-     */
-    public abstract void createStateInZookeeper(String key);
-
-    /**
-     * Client facing API to create a blob.
-     * @param key blob key name.
-     * @param meta contains ACL information.
-     * @return AtomicOutputStream returns an output stream into which data can 
be written.
-     * @throws AuthorizationException
-     * @throws KeyAlreadyExistsException
-     */
-    public final AtomicOutputStream createBlob(String key, SettableBlobMeta 
meta) throws AuthorizationException, KeyAlreadyExistsException {
-        if (meta !=null && meta.is_set_acl()) {
-            BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
-        }
-        return createBlobToExtend(key, meta);
-    }
-
-    /**
-     * Client facing API to set the metadata for a blob.
-     * @param key blob key name.
-     * @param meta contains ACL information.
-     * @throws AuthorizationException
-     * @throws KeyNotFoundException
-     */
-    public final void setBlobMeta(String key, SettableBlobMeta meta) throws 
AuthorizationException, KeyNotFoundException {
-        if (meta !=null && meta.is_set_acl()) {
-            BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
-        }
-        setBlobMetaToExtend(key, meta);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java 
b/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java
deleted file mode 100644
index b789335..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Timer;
-import java.util.TimerTask;
-
-/**
- * Very basic blob store impl with no ACL handling.
- */
-public class FileBlobStoreImpl {
-    private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
-    private static final int BUCKETS = 1024;
-    private static final Logger LOG = 
LoggerFactory.getLogger(FileBlobStoreImpl.class);
-    private static final Timer timer = new Timer("FileBlobStore cleanup 
thread", true);
-
-    public class KeyInHashDirIterator implements Iterator<String> {
-        private int currentBucket = 0;
-        private Iterator<String> it = null;
-        private String next = null;
-
-        public KeyInHashDirIterator() throws IOException {
-            primeNext();
-        }
-
-        private void primeNext() throws IOException {
-            while (it == null && currentBucket < BUCKETS) {
-                String name = String.valueOf(currentBucket);
-                File dir = new File(fullPath, name);
-                try {
-                    it = listKeys(dir);
-                } catch (FileNotFoundException e) {
-                    it = null;
-                }
-                if (it == null || !it.hasNext()) {
-                    it = null;
-                    currentBucket++;
-                } else {
-                    next = it.next();
-                }
-            }
-        }
-
-        @Override
-        public boolean hasNext() {
-            return next != null;
-        }
-
-        @Override
-        public String next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            String current = next;
-            next = null;
-            if (it != null) {
-                if (!it.hasNext()) {
-                    it = null;
-                    currentBucket++;
-                    try {
-                        primeNext();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                } else {
-                    next = it.next();
-                }
-            }
-            return current;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("Delete Not Supported");
-        }
-    }
-
-    private File fullPath;
-    private TimerTask cleanup = null;
-
-    public FileBlobStoreImpl(File path, Map<String, Object> conf) throws 
IOException {
-        LOG.info("Creating new blob store based in {}", path);
-        fullPath = path;
-        fullPath.mkdirs();
-        Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
-        if (Utils.getBoolean(shouldCleanup, false)) {
-            LOG.debug("Starting File blobstore cleaner");
-            cleanup = new TimerTask() {
-                @Override
-                public void run() {
-                    try {
-                        fullCleanup(FULL_CLEANUP_FREQ);
-                    } catch (IOException e) {
-                        LOG.error("Error trying to cleanup", e);
-                    }
-                }
-            };
-            timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
-        }
-    }
-
-    /**
-     * @return all keys that are available for reading.
-     * @throws IOException on any error.
-     */
-    public Iterator<String> listKeys() throws IOException {
-        return new KeyInHashDirIterator();
-    }
-
-    /**
-     * Get an input stream for reading a part.
-     * @param key the key of the part to read.
-     * @return the where to read the data from.
-     * @throws IOException on any error
-     */
-    public LocalFsBlobStoreFile read(String key) throws IOException {
-        return new LocalFsBlobStoreFile(getKeyDir(key), 
BlobStoreFile.BLOBSTORE_DATA_FILE);
-    }
-
-    /**
-     * Get an object tied to writing the data.
-     * @param key the key of the part to write to.
-     * @return an object that can be used to both write to, but also 
commit/cancel the operation.
-     * @throws IOException on any error
-     */
-    public LocalFsBlobStoreFile write(String key, boolean create) throws 
IOException {
-        return new LocalFsBlobStoreFile(getKeyDir(key), true, create);
-    }
-
-    /**
-     * Check if the key exists in the blob store.
-     * @param key the key to check for
-     * @return true if it exists else false.
-     */
-    public boolean exists(String key) {
-        return getKeyDir(key).exists();
-    }
-
-    /**
-     * Delete a key from the blob store
-     * @param key the key to delete
-     * @throws IOException on any error
-     */
-    public void deleteKey(String key) throws IOException {
-        File keyDir = getKeyDir(key);
-        LocalFsBlobStoreFile pf = new LocalFsBlobStoreFile(keyDir, 
BlobStoreFile.BLOBSTORE_DATA_FILE);
-        pf.delete();
-        delete(keyDir);
-    }
-
-    private File getKeyDir(String key) {
-        String hash = String.valueOf(Math.abs((long)key.hashCode()) % BUCKETS);
-        File ret = new File(new File(fullPath, hash), key);
-        LOG.debug("{} Looking for {} in {}", new Object[]{fullPath, key, 
hash});
-        return ret;
-    }
-
-    public void fullCleanup(long age) throws IOException {
-        long cleanUpIfBefore = System.currentTimeMillis() - age;
-        Iterator<String> keys = new KeyInHashDirIterator();
-        while (keys.hasNext()) {
-            String key = keys.next();
-            File keyDir = getKeyDir(key);
-            Iterator<LocalFsBlobStoreFile> i = listBlobStoreFiles(keyDir);
-            if (!i.hasNext()) {
-                //The dir is empty, so try to delete it, may fail, but that is 
OK
-                try {
-                    keyDir.delete();
-                } catch (Exception e) {
-                    LOG.warn("Could not delete "+keyDir+" will try again 
later");
-                }
-            }
-            while (i.hasNext()) {
-                LocalFsBlobStoreFile f = i.next();
-                if (f.isTmp()) {
-                    if (f.getModTime() <= cleanUpIfBefore) {
-                        f.delete();
-                    }
-                }
-            }
-        }
-    }
-
-    protected Iterator<LocalFsBlobStoreFile> listBlobStoreFiles(File path) 
throws IOException {
-        ArrayList<LocalFsBlobStoreFile> ret = new 
ArrayList<LocalFsBlobStoreFile>();
-        File[] files = path.listFiles();
-        if (files != null) {
-            for (File sub: files) {
-                try {
-                    ret.add(new LocalFsBlobStoreFile(sub.getParentFile(), 
sub.getName()));
-                } catch (IllegalArgumentException e) {
-                    //Ignored the file did not match
-                    LOG.warn("Found an unexpected file in {} {}",path, 
sub.getName());
-                }
-            }
-        }
-        return ret.iterator();
-    }
-
-    protected Iterator<String> listKeys(File path) throws IOException {
-        String[] files = path.list();
-        if (files != null) {
-            return Arrays.asList(files).iterator();
-        }
-        return new LinkedList<String>().iterator();
-    }
-
-    protected void delete(File path) throws IOException {
-        Files.deleteIfExists(path.toPath());
-    }
-
-    public void shutdown() {
-        if (cleanup != null) {
-            cleanup.cancel();
-            cleanup = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java 
b/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java
deleted file mode 100644
index 1d29fda..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public abstract class InputStreamWithMeta extends InputStream {
-    public abstract long getVersion() throws IOException;
-    public abstract long getFileLength() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java 
b/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java
deleted file mode 100644
index 32bb9fd..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-public interface KeyFilter<R> {
-    R filter(String key);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java 
b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
deleted file mode 100644
index 2a53828..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.blobstore;
-
-import backtype.storm.nimbus.NimbusInfo;
-import backtype.storm.utils.Utils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.TreeSet;
-import java.util.Map;
-import java.util.List;
-
-/**
- * Class hands over the key sequence number which implies the number of 
updates made to a blob.
- * The information regarding the keys and the sequence number which represents 
the number of updates are
- * stored within the zookeeper in the following format.
- * /storm/blobstore/key_name/nimbushostport-sequencenumber
- * Example:
- * If there are two nimbodes with nimbus.seeds:leader,non-leader are set,
- * then the state inside the zookeeper is eventually stored as:
- * /storm/blobstore/key1/leader:8080-1
- * /storm/blobstore/key1/non-leader:8080-1
- * indicates that a new blob with the name key1 has been created on the leader
- * nimbus and the non-leader nimbus syncs after a call back is triggered by 
attempting
- * to download the blob and finally updates its state inside the zookeeper.
- *
- * A watch is placed on the /storm/blobstore/key1 and the znodes leader:8080-1 
and
- * non-leader:8080-1 are ephemeral which implies that these nodes exist only 
until the
- * connection between the corresponding nimbus and the zookeeper persist. If 
in case the
- * nimbus crashes the node disappears under /storm/blobstore/key1.
- *
- * The sequence number for the keys are handed over based on the following 
scenario:
- * Lets assume there are three nimbodes up and running, one being the leader 
and the other
- * being the non-leader.
- *
- * 1. Create is straight forward.
- * Check whether the znode -> /storm/blobstore/key1 has been created or not. 
It implies
- * the blob has not been created yet. If not created, it creates it and 
updates the zookeeper
- * states under /storm/blobstore/key1 and 
/storm/blobstoremaxkeysequencenumber/key1.
- * The znodes it creates on these nodes are 
/storm/blobstore/key1/leader:8080-1,
- * /storm/blobstore/key1/non-leader:8080-1 and 
/storm/blobstoremaxkeysequencenumber/key1/1.
- * The latter holds the global sequence number across all nimbodes more like a 
static variable
- * indicating the true value of number of updates for a blob. This node helps 
to maintain sanity in case
- * leadership changes due to crashing.
- *
- * 2. Delete does not require to hand over the sequence number.
- *
- * 3. Finally, the update has few scenarios.
- *
- *  The class implements a TreeSet. The basic idea is if all the nimbodes have 
the same
- *  sequence number for the blob, then the number of elements in the set is 1 
which holds
- *  the latest value of sequence number. If the number of elements are greater 
than 1 then it
- *  implies that there is sequence mismatch and there is need for syncing the 
blobs across
- *  nimbodes.
- *
- *  The logic for handing over sequence numbers based on the state are 
described as follows
- *  Here consider Nimbus-1 alias as N1 and Nimbus-2 alias as N2.
- *  Scenario 1:
- *  Example: Normal create/update scenario
- *  Operation     Nimbus-1:state     Nimbus-2:state     Seq-Num-Nimbus-1  
Seq-Num-Nimbus-2          Max-Seq-Num
- *  Create-Key1   alive - Leader     alive              1                      
                     1
- *  Sync          alive - Leader     alive              1                 1 
(callback -> download)  1
- *  Update-Key1   alive - Leader     alive              2                 1    
                     2
- *  Sync          alive - Leader     alive              2                 2 
(callback -> download)  2
- *
- *  Scenario 2:
- *  Example: Leader nimbus crash followed by leader election, update and 
ex-leader restored again
- *  Operation     Nimbus-1:state     Nimbus-2:state     Seq-Num-Nimbus-1  
Seq-Num-Nimbus-2          Max-Seq-Num
- *  Create        alive - Leader     alive              1                      
                     1
- *  Sync          alive - Leader     alive              1                 1 
(callback -> download)  1
- *  Update        alive - Leader     alive              2                 1    
                     2
- *  Sync          alive - Leader     alive              2                 2 
(callback -> download)  2
- *  Update        alive - Leader     alive              3                 2    
                     3
- *  Crash         crash - Leader     alive              3                 2    
                     3
- *  New - Leader  crash              alive - Leader     3 (Invalid)       2    
                     3
- *  Update        crash              alive - Leader     3 (Invalid)       4 
(max-seq-num + 1)       4
- *  N1-Restored   alive              alive - Leader     0                 4    
                     4
- *  Sync          alive              alive - Leader     4                 4    
                     4
- *
- *  Scenario 3:
- *  Example: Leader nimbus crash followed by leader election, update and 
ex-leader restored again
- *  Operation     Nimbus-1:state     Nimbus-2:state     Seq-Num-Nimbus-1  
Seq-Num-Nimbus-2          Max-Seq-Num
- *  Create        alive - Leader     alive              1                      
                     1
- *  Sync          alive - Leader     alive              1                 1 
(callback -> download)  1
- *  Update        alive - Leader     alive              2                 1    
                     2
- *  Sync          alive - Leader     alive              2                 2 
(callback -> download)  2
- *  Update        alive - Leader     alive              3                 2    
                     3
- *  Crash         crash - Leader     alive              3                 2    
                     3
- *  Elect Leader  crash              alive - Leader     3 (Invalid)       2    
                     3
- *  N1-Restored   alive              alive - Leader     3                 2    
                     3
- *  Read/Update   alive              alive - Leader     3                 4 
(Downloads from N1)     4
- *  Sync          alive              alive - Leader     4 (callback)      4    
                     4
- *  Here the download is triggered whenever an operation corresponding to the 
blob is triggered on the
- *  nimbus like a read or update operation. Here, in the read/update call it 
is hard to know which call
- *  is read or update. Hence, by incrementing the sequence number to 
max-seq-num + 1 we ensure that the
- *  synchronization happens appropriately and all nimbodes have the same blob.
- */
-public class KeySequenceNumber {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KeySequenceNumber.class);
-    private final String BLOBSTORE_SUBTREE="/blobstore";
-    private final String 
BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber";
-    private final String key;
-    private final NimbusInfo nimbusInfo;
-    private final int INT_CAPACITY = 4;
-    private final int INITIAL_SEQUENCE_NUMBER = 1;
-
-    public KeySequenceNumber(String key, NimbusInfo nimbusInfo) {
-        this.key = key;
-        this.nimbusInfo = nimbusInfo;
-    }
-
-    public synchronized int getKeySequenceNumber(Map conf) {
-        TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
-        CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
-        try {
-            // Key has not been created yet and it is the first time it is 
being created
-            if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) 
== null) {
-                
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
-                        
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE
 + "/" + key);
-                zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE 
+ "/" + key,
-                        
ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array());
-                return INITIAL_SEQUENCE_NUMBER;
-            }
-
-            // When all nimbodes go down and one or few of them come up
-            // Unfortunately there might not be an exact way to know which one 
contains the most updated blob,
-            // if all go down which is unlikely. Hence there might be a need 
to update the blob if all go down.
-            List<String> stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
-            LOG.debug("stateInfoList-size {} stateInfoList-data {}", 
stateInfoList.size(), stateInfoList);
-            if(stateInfoList.isEmpty()) {
-                return getMaxSequenceNumber(zkClient);
-            }
-
-            LOG.debug("stateInfoSize {}", stateInfoList.size());
-            // In all other cases check for the latest update sequence of the 
blob on the nimbus
-            // and assign the appropriate number. Check if all are have same 
sequence number,
-            // if not assign the highest sequence number.
-            for (String stateInfo:stateInfoList) {
-                
sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo)
-                        .getSequenceNumber()));
-            }
-
-            // Update scenario 2 and 3 explain the code logic written here
-            // especially when nimbus crashes and comes up after and before 
update
-            // respectively.
-            int currentSeqNumber = getMaxSequenceNumber(zkClient);
-            if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
-                if (sequenceNumbers.last() < currentSeqNumber) {
-                    return currentSeqNumber;
-                } else {
-                    return INITIAL_SEQUENCE_NUMBER - 1;
-                }
-            }
-
-            // It covers scenarios expalined in scenario 3 when nimbus-1 
holding the latest
-            // update goes down before it is downloaded by nimbus-2. Nimbus-2 
gets elected as a leader
-            // after which nimbus-1 comes back up and a read or update is 
performed.
-            if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && nimbusInfo.isLeader()) {
-                incrementMaxSequenceNumber(zkClient, currentSeqNumber);
-                return currentSeqNumber + 1;
-            }
-
-            // This code logic covers the update scenarios in 2 when the 
nimbus-1 goes down
-            // before syncing the blob to nimbus-2 and an update happens.
-            // If seq-num for nimbus-2 is 2 and max-seq-number is 3 then next 
sequence number is 4
-            // (max-seq-number + 1).
-            // Other scenario it covers is when max-seq-number and nimbus seq 
number are equal.
-            if (sequenceNumbers.size() == 1) {
-                if (sequenceNumbers.first() < currentSeqNumber) {
-                    incrementMaxSequenceNumber(zkClient, currentSeqNumber);
-                    return currentSeqNumber + 1;
-                } else {
-                    incrementMaxSequenceNumber(zkClient, currentSeqNumber);
-                    return sequenceNumbers.first() + 1;
-                }
-            }
-        } catch(Exception e) {
-            LOG.error("Exception {}", e);
-        } finally {
-            if (zkClient != null) {
-                zkClient.close();
-            }
-        }
-        // Normal create update sync scenario returns the greatest sequence 
number in the set
-        return sequenceNumbers.last();
-    }
-
-    private boolean checkIfStateContainsCurrentNimbusHost(List<String> 
stateInfoList, NimbusInfo nimbusInfo) {
-        boolean containsNimbusHost = false;
-        for(String stateInfo:stateInfoList) {
-            if(stateInfo.contains(nimbusInfo.getHost())) {
-                containsNimbusHost = true;
-                break;
-            }
-        }
-        return containsNimbusHost;
-    }
-
-    private void incrementMaxSequenceNumber(CuratorFramework zkClient, int 
count) throws Exception {
-        zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + 
key,
-                ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array());
-    }
-
-    private int getMaxSequenceNumber(CuratorFramework zkClient) throws 
Exception {
-        return ByteBuffer.wrap(zkClient.getData()
-                .forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + 
key)).getInt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
deleted file mode 100644
index b8daad2..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.Config;
-import backtype.storm.generated.SettableBlobMeta;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.KeyAlreadyExistsException;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.generated.ReadableBlobMeta;
-
-import backtype.storm.nimbus.NimbusInfo;
-import backtype.storm.utils.Utils;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;;
-
-import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
-import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
-import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
-
-/**
- * Provides a local file system backed blob store implementation for Nimbus.
- *
- * For a local blob store the user and the supervisor use NimbusBlobStore 
Client API in order to talk to nimbus through thrift.
- * The authentication and authorization here is based on the subject.
- * We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. 
NIMBUS_ADMINS are given READ, WRITE and ADMIN
- * access whereas the SUPERVISOR_ADMINS are given READ access in order to read 
and download the blobs form the nimbus.
- *
- * The ACLs for the blob store are validated against whether the subject is a 
NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
- * who has read, write or admin privileges in order to perform respective 
operations on the blob.
- *
- * For local blob store
- * 1. The USER interacts with nimbus to upload and access blobs through 
NimbusBlobStore Client API.
- * 2. The USER sets the ACLs, and the blob access is validated against these 
ACLs.
- * 3. The SUPERVISOR interacts with nimbus through the NimbusBlobStore Client 
API to download the blobs.
- * The supervisors principal should match the set of users configured into 
SUPERVISOR_ADMINS.
- * Here, the PrincipalToLocalPlugin takes care of mapping the principal to 
user name before the ACL validation.
- */
-public class LocalFsBlobStore extends BlobStore {
-    public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
-    private static final String DATA_PREFIX = "data_";
-    private static final String META_PREFIX = "meta_";
-    protected BlobStoreAclHandler _aclHandler;
-    private final String BLOBSTORE_SUBTREE = "/blobstore/";
-    private NimbusInfo nimbusInfo;
-    private FileBlobStoreImpl fbs;
-    private final int allPermissions = READ | WRITE | ADMIN;
-    private Map conf;
-    private CuratorFramework zkClient;
-
-    @Override
-    public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) {
-        this.conf = conf;
-        this.nimbusInfo = nimbusInfo;
-        zkClient = BlobStoreUtils.createZKClient(conf);
-        if (overrideBase == null) {
-            overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
-            if (overrideBase == null) {
-                overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
-            }
-        }
-        File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
-        try {
-            fbs = new FileBlobStoreImpl(baseDir, conf);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        _aclHandler = new BlobStoreAclHandler(conf);
-    }
-
-    @Override
-    public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
-        LOG.debug("Creating Blob for key {}", key);
-        validateKey(key);
-        _aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions);
-        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
-        _aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key);
-        if (fbs.exists(DATA_PREFIX+key)) {
-            throw new KeyAlreadyExistsException(key);
-        }
-        BlobStoreFileOutputStream mOut = null;
-        try {
-            mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
-            mOut.write(Utils.thriftSerialize(meta));
-            mOut.close();
-            mOut = null;
-            return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            if (mOut != null) {
-                try {
-                    mOut.cancel();
-                } catch (IOException e) {
-                    //Ignored
-                }
-            }
-        }
-    }
-
-    @Override
-    public AtomicOutputStream updateBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
-        validateKey(key);
-        checkForBlobOrDownload(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
-        try {
-            return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
false));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private SettableBlobMeta getStoredBlobMeta(String key) throws 
KeyNotFoundException {
-        InputStream in = null;
-        try {
-            LocalFsBlobStoreFile pf = fbs.read(META_PREFIX+key);
-            try {
-                in = pf.getInputStream();
-            } catch (FileNotFoundException fnf) {
-                throw new KeyNotFoundException(key);
-            }
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            byte [] buffer = new byte[2048];
-            int len;
-            while ((len = in.read(buffer)) > 0) {
-                out.write(buffer, 0, len);
-            }
-            in.close();
-            in = null;
-            return Utils.thriftDeserialize(SettableBlobMeta.class, 
out.toByteArray());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            if (in != null) {
-                try {
-                    in.close();
-                } catch (IOException e) {
-                    //Ignored
-                }
-            }
-        }
-    }
-
-    @Override
-    public ReadableBlobMeta getBlobMeta(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
-        validateKey(key);
-        if(!checkForBlobOrDownload(key)) {
-            checkForBlobUpdate(key);
-        }
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
-        ReadableBlobMeta rbm = new ReadableBlobMeta();
-        rbm.set_settable(meta);
-        try {
-            LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX+key);
-            rbm.set_version(pf.getModTime());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return rbm;
-    }
-
-    @Override
-    public void setBlobMeta(String key, SettableBlobMeta meta, Subject who) 
throws AuthorizationException, KeyNotFoundException {
-        validateKey(key);
-        checkForBlobOrDownload(key);
-        _aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN);
-        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
-        SettableBlobMeta orig = getStoredBlobMeta(key);
-        _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
-        BlobStoreFileOutputStream mOut = null;
-        try {
-            mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
false));
-            mOut.write(Utils.thriftSerialize(meta));
-            mOut.close();
-            mOut = null;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            if (mOut != null) {
-                try {
-                    mOut.cancel();
-                } catch (IOException e) {
-                    //Ignored
-                }
-            }
-        }
-    }
-
-    @Override
-    public void deleteBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
-        validateKey(key);
-        checkForBlobOrDownload(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
-        try {
-            fbs.deleteKey(DATA_PREFIX+key);
-            fbs.deleteKey(META_PREFIX+key);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public InputStreamWithMeta getBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
-        validateKey(key);
-        if(!checkForBlobOrDownload(key)) {
-            checkForBlobUpdate(key);
-        }
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
-        try {
-            return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX+key));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Iterator<String> listKeys() {
-        try {
-            return new KeyTranslationIterator(fbs.listKeys(), DATA_PREFIX);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        if (zkClient != null) {
-            zkClient.close();
-        }
-    }
-
-    @Override
-    public int getBlobReplication(String key, Subject who) throws Exception {
-        int replicationCount = 0;
-        validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
-        if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
-            return 0;
-        }
-        replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + 
key).size();
-        return replicationCount;
-    }
-
-    @Override
-    public int updateBlobReplication(String key, int replication, Subject who) 
throws AuthorizationException, KeyNotFoundException {
-        throw new UnsupportedOperationException("For local file system blob 
store the update blobs function does not work. " +
-                "Please use HDFS blob store to make this feature available.");
-    }
-
-    //This additional check and download is for nimbus high availability in 
case you have more than one nimbus
-    public synchronized boolean checkForBlobOrDownload(String key) {
-        boolean checkBlobDownload = false;
-        try {
-            List<String> keyList = 
BlobStoreUtils.getKeyListFromBlobStore(this);
-            if (!keyList.contains(key)) {
-                if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) != 
null) {
-                    Set<NimbusInfo> nimbusSet = 
BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
-                    if (BlobStoreUtils.downloadMissingBlob(conf, this, key, 
nimbusSet)) {
-                        LOG.debug("Updating blobs state");
-                        BlobStoreUtils.createStateInZookeeper(conf, key, 
nimbusInfo);
-                        checkBlobDownload = true;
-                    }
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        return checkBlobDownload;
-    }
-
-    public synchronized void checkForBlobUpdate(String key) {
-        BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, 
nimbusInfo);
-    }
-
-    public void fullCleanup(long age) throws IOException {
-        fbs.fullCleanup(age);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java 
b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java
deleted file mode 100644
index fb11fa6..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.generated.SettableBlobMeta;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.regex.Matcher;
-
-public class LocalFsBlobStoreFile extends BlobStoreFile {
-
-    private final String _key;
-    private final boolean _isTmp;
-    private final File _path;
-    private Long _modTime = null;
-    private final boolean _mustBeNew;
-    private SettableBlobMeta meta;
-
-    public LocalFsBlobStoreFile(File base, String name) {
-        if (BlobStoreFile.BLOBSTORE_DATA_FILE.equals(name)) {
-            _isTmp = false;
-        } else {
-            Matcher m = TMP_NAME_PATTERN.matcher(name);
-            if (!m.matches()) {
-                throw new IllegalArgumentException("File name does not match 
'"+name+"' !~ "+TMP_NAME_PATTERN);
-            }
-            _isTmp = true;
-        }
-        _key = base.getName();
-        _path = new File(base, name);
-        _mustBeNew = false;
-    }
-
-    public LocalFsBlobStoreFile(File base, boolean isTmp, boolean mustBeNew) {
-        _key = base.getName();
-        _isTmp = isTmp;
-        _mustBeNew = mustBeNew;
-        if (_isTmp) {
-            _path = new File(base, System.currentTimeMillis()+TMP_EXT);
-        } else {
-            _path = new File(base, BlobStoreFile.BLOBSTORE_DATA_FILE);
-        }
-    }
-
-    @Override
-    public void delete() throws IOException {
-        _path.delete();
-    }
-
-    @Override
-    public boolean isTmp() {
-        return _isTmp;
-    }
-
-    @Override
-    public String getKey() {
-        return _key;
-    }
-
-    @Override
-    public long getModTime() throws IOException {
-        if (_modTime == null) {
-            _modTime = _path.lastModified();
-        }
-        return _modTime;
-    }
-
-    @Override
-    public InputStream getInputStream() throws IOException {
-        if (isTmp()) {
-            throw new IllegalStateException("Cannot read from a temporary part 
file.");
-        }
-        return new FileInputStream(_path);
-    }
-
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-        if (!isTmp()) {
-            throw new IllegalStateException("Can only write to a temporary 
part file.");
-        }
-        boolean success = false;
-        try {
-            success = _path.createNewFile();
-        } catch (IOException e) {
-            //Try to create the parent directory, may not work
-            _path.getParentFile().mkdirs();
-            success = _path.createNewFile();
-        }
-        if (!success) {
-            throw new IOException(_path+" already exists");
-        }
-        return new FileOutputStream(_path);
-    }
-
-    @Override
-    public void commit() throws IOException {
-        if (!isTmp()) {
-            throw new IllegalStateException("Can only write to a temporary 
part file.");
-        }
-
-        File dest = new File(_path.getParentFile(), 
BlobStoreFile.BLOBSTORE_DATA_FILE);
-        if (_mustBeNew) {
-            Files.move(_path.toPath(), dest.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
-        } else {
-            Files.move(_path.toPath(), dest.toPath(), 
StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
-        }
-    }
-
-    @Override
-    public void cancel() throws IOException {
-        if (!isTmp()) {
-            throw new IllegalStateException("Can only write to a temporary 
part file.");
-        }
-        delete();
-    }
-
-    @Override
-    public SettableBlobMeta getMetadata () {
-        return meta;
-    }
-
-    @Override
-    public void setMetadata (SettableBlobMeta meta) {
-        this.meta = meta;
-    }
-
-    @Override
-    public String toString() {
-        return _path+":"+(_isTmp ? "tmp": 
BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key;
-    }
-
-    @Override
-    public long getFileLength() {
-        return _path.length();
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java 
b/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java
deleted file mode 100644
index 334e6bb..0000000
--- a/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.blobstore;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.BeginDownloadResult;
-import backtype.storm.generated.ListBlobsResult;
-import backtype.storm.generated.ReadableBlobMeta;
-import backtype.storm.generated.SettableBlobMeta;
-import backtype.storm.generated.KeyAlreadyExistsException;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * NimbusBlobStore is a USER facing client API to perform
- * basic operations such as create, update, delete and read
- * for local and hdfs blob store.
- *
- * For local blob store it is also the client facing API for
- * supervisor in order to download blobs from nimbus.
- */
-public class NimbusBlobStore extends ClientBlobStore {
-    private static final Logger LOG = 
LoggerFactory.getLogger(NimbusBlobStore.class);
-
-    public class NimbusKeyIterator implements Iterator<String> {
-        private ListBlobsResult listBlobs = null;
-        private int offset = 0;
-        private boolean eof = false;
-
-        public NimbusKeyIterator(ListBlobsResult listBlobs) {
-            this.listBlobs = listBlobs;
-            this.eof = (listBlobs.get_keys_size() == 0);
-        }
-
-        private boolean isCacheEmpty() {
-            return listBlobs.get_keys_size() <= offset;
-        }
-
-        private void readMore() throws TException {
-            if (!eof) {
-                offset = 0;
-                synchronized(client) {
-                    listBlobs = 
client.getClient().listBlobs(listBlobs.get_session());
-                }
-                if (listBlobs.get_keys_size() == 0) {
-                    eof = true;
-                }
-            }
-        }
-
-        @Override
-        public synchronized boolean hasNext() {
-            try {
-                if (isCacheEmpty()) {
-                    readMore();
-                }
-            } catch (TException e) {
-                throw new RuntimeException(e);
-            }
-            return !eof;
-        }
-
-        @Override
-        public synchronized String next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            String ret = listBlobs.get_keys().get(offset);
-            offset++;
-            return ret;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("Delete Not Supported");
-        }
-    }
-
-    public class NimbusDownloadInputStream extends InputStreamWithMeta {
-        private BeginDownloadResult beginBlobDownload;
-        private byte[] buffer = null;
-        private int offset = 0;
-        private int end = 0;
-        private boolean eof = false;
-
-        public NimbusDownloadInputStream(BeginDownloadResult 
beginBlobDownload) {
-            this.beginBlobDownload = beginBlobDownload;
-        }
-
-        @Override
-        public long getVersion() throws IOException {
-            return beginBlobDownload.get_version();
-        }
-
-        @Override
-        public synchronized int read() throws IOException {
-            try {
-                if (isEmpty()) {
-                    readMore();
-                    if (eof) {
-                        return -1;
-                    }
-                }
-                int length = Math.min(1, available());
-                if (length == 0) {
-                    return -1;
-                }
-                int ret = buffer[offset];
-                offset += length;
-                return ret;
-            } catch(TException exp) {
-                throw new IOException(exp);
-            }
-        }
-
-        @Override
-        public synchronized int read(byte[] b, int off, int len) throws 
IOException {
-            try {
-                if (isEmpty()) {
-                    readMore();
-                    if (eof) {
-                        return -1;
-                    }
-                }
-                int length = Math.min(len, available());
-                System.arraycopy(buffer, offset, b, off, length);
-                offset += length;
-                return length;
-            } catch(TException exp) {
-                throw new IOException(exp);
-            }
-        }
-
-        private boolean isEmpty() {
-            return buffer == null || offset >= end;
-        }
-
-        private void readMore() throws TException {
-            if (!eof) {
-                ByteBuffer buff;
-                synchronized(client) {
-                    buff = 
client.getClient().downloadBlobChunk(beginBlobDownload.get_session());
-                }
-                buffer = buff.array();
-                offset = buff.arrayOffset() + buff.position();
-                int length = buff.remaining();
-                end = offset + length;
-                if (length == 0) {
-                    eof = true;
-                }
-            }
-        }
-
-        @Override
-        public synchronized int read(byte[] b) throws IOException {
-            return read(b, 0, b.length);
-        }
-
-        @Override
-        public synchronized int available() {
-            return buffer == null ? 0 : (end - offset);
-        }
-
-        @Override
-        public long getFileLength() {
-            return beginBlobDownload.get_data_size();
-        }
-    }
-
-    public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
-        private String session;
-        private int maxChunkSize = 4096;
-        private String key;
-
-        public NimbusUploadAtomicOutputStream(String session, int bufferSize, 
String key) {
-            this.session = session;
-            this.maxChunkSize = bufferSize;
-            this.key = key;
-        }
-
-        @Override
-        public void cancel() throws IOException {
-            try {
-                synchronized(client) {
-                    client.getClient().cancelBlobUpload(session);
-                }
-            } catch (TException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public void write(int b) throws IOException {
-            try {
-                synchronized(client) {
-                    client.getClient().uploadBlobChunk(session, 
ByteBuffer.wrap(new byte[] {(byte)b}));
-                }
-            } catch (TException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public void write(byte []b) throws IOException {
-            write(b, 0, b.length);
-        }
-
-        @Override
-        public void write(byte []b, int offset, int len) throws IOException {
-            try {
-                int end = offset + len;
-                for (int realOffset = offset; realOffset < end; realOffset += 
maxChunkSize) {
-                    int realLen = Math.min(end - realOffset, maxChunkSize);
-                    LOG.debug("Writing {} bytes of {} 
remaining",realLen,(end-realOffset));
-                    synchronized(client) {
-                        client.getClient().uploadBlobChunk(session, 
ByteBuffer.wrap(b, realOffset, realLen));
-                    }
-                }
-            } catch (TException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public void close() throws IOException {
-            try {
-                synchronized(client) {
-                    client.getClient().finishBlobUpload(session);
-                    client.getClient().createStateInZookeeper(key);
-                }
-            } catch (TException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private NimbusClient client;
-    private int bufferSize = 4096;
-
-    @Override
-    public void prepare(Map conf) {
-        this.client = NimbusClient.getConfiguredClient(conf);
-        if (conf != null) {
-            this.bufferSize = 
Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), 
bufferSize);
-        }
-    }
-
-    @Override
-    protected AtomicOutputStream createBlobToExtend(String key, 
SettableBlobMeta meta)
-            throws AuthorizationException, KeyAlreadyExistsException {
-        try {
-            synchronized(client) {
-                return new 
NimbusUploadAtomicOutputStream(client.getClient().beginCreateBlob(key, meta), 
this.bufferSize, key);
-            }
-        } catch (AuthorizationException | KeyAlreadyExistsException exp) {
-            throw exp;
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public AtomicOutputStream updateBlob(String key)
-            throws AuthorizationException, KeyNotFoundException {
-        try {
-            synchronized(client) {
-                return new 
NimbusUploadAtomicOutputStream(client.getClient().beginUpdateBlob(key), 
this.bufferSize, key);
-            }
-        } catch (AuthorizationException | KeyNotFoundException exp) {
-            throw exp;
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public ReadableBlobMeta getBlobMeta(String key) throws 
AuthorizationException, KeyNotFoundException {
-        try {
-            synchronized(client) {
-                return client.getClient().getBlobMeta(key);
-            }
-        } catch (AuthorizationException | KeyNotFoundException exp) {
-            throw exp;
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    protected void setBlobMetaToExtend(String key, SettableBlobMeta meta)
-            throws AuthorizationException, KeyNotFoundException {
-        try {
-            synchronized(client) {
-                client.getClient().setBlobMeta(key, meta);
-            }
-        } catch (AuthorizationException | KeyNotFoundException exp) {
-            throw exp;
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void deleteBlob(String key) throws AuthorizationException, 
KeyNotFoundException {
-        try {
-            synchronized(client) {
-                client.getClient().deleteBlob(key);
-            }
-        } catch (AuthorizationException | KeyNotFoundException exp) {
-            throw exp;
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void createStateInZookeeper(String key) {
-        try {
-            synchronized(client) {
-                client.getClient().createStateInZookeeper(key);
-            }
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public InputStreamWithMeta getBlob(String key) throws 
AuthorizationException, KeyNotFoundException {
-        try {
-            synchronized(client) {
-                return new 
NimbusDownloadInputStream(client.getClient().beginBlobDownload(key));
-            }
-        } catch (AuthorizationException | KeyNotFoundException exp) {
-            throw exp;
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Iterator<String> listKeys() {
-        try {
-            synchronized(client) {
-                return new NimbusKeyIterator(client.getClient().listBlobs(""));
-            }
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public int getBlobReplication(String key) throws AuthorizationException, 
KeyNotFoundException {
-        try {
-            return client.getClient().getBlobReplication(key);
-        } catch (AuthorizationException | KeyNotFoundException exp) {
-            throw exp;
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public int updateBlobReplication(String key, int replication) throws 
AuthorizationException, KeyNotFoundException {
-        try {
-            return client.getClient().updateBlobReplication(key, replication);
-        } catch (AuthorizationException | KeyNotFoundException exp) {
-            throw exp;
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public boolean setClient(Map conf, NimbusClient client) {
-        this.client = client;
-        if (conf != null) {
-            this.bufferSize = 
Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), 
bufferSize);
-        }
-        return true;
-    }
-
-    @Override
-    protected void finalize() {
-        shutdown();
-    }
-
-    @Override
-    public void shutdown() {
-        if (client != null) {
-            client.close();
-            client = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java 
b/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java
deleted file mode 100644
index 5de9bde..0000000
--- a/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.clojure;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class ClojureBolt implements IRichBolt, FinishedCallback {
-    Map<String, StreamInfo> _fields;
-    List<String> _fnSpec;
-    List<String> _confSpec;
-    List<Object> _params;
-    
-    IBolt _bolt;
-    
-    public ClojureBolt(List fnSpec, List confSpec, List<Object> params, 
Map<String, StreamInfo> fields) {
-        _fnSpec = fnSpec;
-        _confSpec = confSpec;
-        _params = params;
-        _fields = fields;
-    }
-
-    @Override
-    public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
-        IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
-        try {
-            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( 
new Object[] {
-                Keyword.intern(Symbol.create("output-collector")), collector,
-                Keyword.intern(Symbol.create("context")), context});
-            List<Object> args = new ArrayList<Object>() {{
-                add(stormConf);
-                add(context);
-                add(collectorMap);
-            }};
-            
-            _bolt = (IBolt) preparer.applyTo(RT.seq(args));
-            //this is kind of unnecessary for clojure
-            try {
-                _bolt.prepare(stormConf, context, collector);
-            } catch(AbstractMethodError ame) {
-                
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        _bolt.execute(input);
-    }
-
-    @Override
-    public void cleanup() {
-            try {
-                _bolt.cleanup();
-            } catch(AbstractMethodError ame) {
-                
-            }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _fields.keySet()) {
-            StreamInfo info = _fields.get(stream);
-            declarer.declareStream(stream, info.is_direct(), new 
Fields(info.get_output_fields()));
-        }
-    }
-
-    @Override
-    public void finishedId(Object id) {
-        if(_bolt instanceof FinishedCallback) {
-            ((FinishedCallback) _bolt).finishedId(id);
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
-        try {
-            return (Map) hof.applyTo(RT.seq(_params));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/clojure/ClojureSpout.java 
b/storm-core/src/jvm/backtype/storm/clojure/ClojureSpout.java
deleted file mode 100644
index f6422e3..0000000
--- a/storm-core/src/jvm/backtype/storm/clojure/ClojureSpout.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.spout.ISpout;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class ClojureSpout implements IRichSpout {
-    Map<String, StreamInfo> _fields;
-    List<String> _fnSpec;
-    List<String> _confSpec;
-    List<Object> _params;
-    
-    ISpout _spout;
-    
-    public ClojureSpout(List fnSpec, List confSpec, List<Object> params, 
Map<String, StreamInfo> fields) {
-        _fnSpec = fnSpec;
-        _confSpec = confSpec;
-        _params = params;
-        _fields = fields;
-    }
-    
-
-    @Override
-    public void open(final Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
-        IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
-        try {
-            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( 
new Object[] {
-                Keyword.intern(Symbol.create("output-collector")), collector,
-                Keyword.intern(Symbol.create("context")), context});
-            List<Object> args = new ArrayList<Object>() {{
-                add(conf);
-                add(context);
-                add(collectorMap);
-            }};
-            
-            _spout = (ISpout) preparer.applyTo(RT.seq(args));
-            //this is kind of unnecessary for clojure
-            try {
-                _spout.open(conf, context, collector);
-            } catch(AbstractMethodError ame) {
-                
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-        try {
-            _spout.close();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        try {
-            _spout.nextTuple();
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        try {
-            _spout.ack(msgId);
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        try {
-            _spout.fail(msgId);
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _fields.keySet()) {
-            StreamInfo info = _fields.get(stream);
-            declarer.declareStream(stream, info.is_direct(), new 
Fields(info.get_output_fields()));
-        }
-    }
-    
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
-        try {
-            return (Map) hof.applyTo(RT.seq(_params));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void activate() {
-        try {
-            _spout.activate();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-
-    @Override
-    public void deactivate() {
-        try {
-            _spout.deactivate();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java 
b/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
deleted file mode 100644
index a155008..0000000
--- a/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellBolt extends ShellBolt implements IRichBolt {
-    private Map<String, StreamInfo> _outputs;
-    
-    public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
-        super(command);
-        _outputs = outputs;
-    }
-    
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _outputs.keySet()) {
-            StreamInfo def = _outputs.get(stream);
-            if(def.is_direct()) {
-                declarer.declareStream(stream, true, new 
Fields(def.get_output_fields()));
-            } else {
-                declarer.declareStream(stream, new 
Fields(def.get_output_fields()));                
-            }
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java 
b/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
deleted file mode 100644
index b49fbef..0000000
--- a/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.spout.ShellSpout;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellSpout extends ShellSpout implements IRichSpout {
-    private Map<String, StreamInfo> _outputs;
-
-    public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
-        super(command);
-        _outputs = outputs;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _outputs.keySet()) {
-            StreamInfo def = _outputs.get(stream);
-            if(def.is_direct()) {
-                declarer.declareStream(stream, true, new 
Fields(def.get_output_fields()));
-            } else {
-                declarer.declareStream(stream, new 
Fields(def.get_output_fields()));
-            }
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-}

Reply via email to