http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index f3842b0..9fdd447 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.blobstore;
 
 import java.io.IOException;
@@ -45,7 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BlobStoreUtils {
-    private static final String BLOBSTORE_SUBTREE="/blobstore";
+    private static final String BLOBSTORE_SUBTREE = "/blobstore";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreUtils.class);
 
@@ -59,7 +54,8 @@ public class BlobStoreUtils {
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
         CuratorFramework zkClient = CuratorUtils.newCurator(conf, zkServers, 
port,
-            (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo, 
type.getDefaultZkAcls(conf));
+                                                            (String) 
conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo,
+                                                            
type.getDefaultZkAcls(conf));
         zkClient.start();
         return zkClient;
     }
@@ -94,7 +90,7 @@ public class BlobStoreUtils {
         int latestSeqNumber = getLatestSequenceNumber(stateInfoList);
         LOG.debug("getNimbodesWithLatestSequenceNumberOfBlob stateInfo {} 
version {}", stateInfoList, latestSeqNumber);
         // Get the nimbodes with the latest version
-        for(String state : stateInfoList) {
+        for (String state : stateInfoList) {
             BlobKeySequenceInfo sequenceInfo = 
normalizeNimbusHostPortSequenceNumberInfo(state);
             if (latestSeqNumber == 
Integer.parseInt(sequenceInfo.getSequenceNumber())) {
                 
nimbusInfoSet.add(NimbusInfo.parse(sequenceInfo.getNimbusHostPort()));
@@ -122,14 +118,14 @@ public class BlobStoreUtils {
 
     // Download missing blobs from potential nimbodes
     public static boolean downloadMissingBlob(Map<String, Object> conf, 
BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
-            throws TTransportException {
+        throws TTransportException {
         ReadableBlobMeta rbm;
         ClientBlobStore remoteBlobStore;
         InputStreamWithMeta in;
         boolean isSuccess = false;
         LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
         for (NimbusInfo nimbusInfo : nimbusInfos) {
-            if(isSuccess) {
+            if (isSuccess) {
                 break;
             }
             LOG.debug("Download blob key: {}, NimbusInfo {}", key, nimbusInfo);
@@ -171,7 +167,7 @@ public class BlobStoreUtils {
 
     // Download updated blobs from potential nimbodes
     public static boolean downloadUpdatedBlob(Map<String, Object> conf, 
BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
-            throws TTransportException {
+        throws TTransportException {
         ClientBlobStore remoteBlobStore;
         InputStreamWithMeta in;
         AtomicOutputStream out;
@@ -233,7 +229,8 @@ public class BlobStoreUtils {
         cb.createStateInZookeeper(key);
     }
 
-    public static void updateKeyForBlobStore (Map<String, Object> conf, 
BlobStore blobStore, CuratorFramework zkClient, String key, NimbusInfo 
nimbusDetails) {
+    public static void updateKeyForBlobStore(Map<String, Object> conf, 
BlobStore blobStore, CuratorFramework zkClient, String key,
+                                             NimbusInfo nimbusDetails) {
         try {
             // Most of clojure tests currently try to access the blobs using 
getBlob. Since, updateKeyForBlobStore
             // checks for updating the correct version of the blob as a part 
of nimbus ha before performing any

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java
index 193ccac..19351b1 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -1,30 +1,24 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.blobstore;
 
 import java.nio.channels.ClosedByInterruptException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.curator.framework.CuratorFramework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,14 +44,6 @@ public class BlobSynchronizer {
         this.nimbusInfo = nimbusInfo;
     }
 
-    public void setZookeeperKeySet(Set<String> zookeeperKeySet) {
-        this.zookeeperKeySet = zookeeperKeySet;
-    }
-
-    public void setBlobStoreKeySet(Set<String> blobStoreKeySet) {
-        this.blobStoreKeySet = blobStoreKeySet;
-    }
-
     public void setZkClient(CuratorFramework zkClient) {
         this.zkClient = zkClient;
     }
@@ -68,19 +54,28 @@ public class BlobSynchronizer {
         return keySet;
     }
 
+    public void setBlobStoreKeySet(Set<String> blobStoreKeySet) {
+        this.blobStoreKeySet = blobStoreKeySet;
+    }
+
     public Set<String> getZookeeperKeySet() {
         Set<String> keySet = new HashSet<String>();
         keySet.addAll(zookeeperKeySet);
         return keySet;
     }
 
+    public void setZookeeperKeySet(Set<String> zookeeperKeySet) {
+        this.zookeeperKeySet = zookeeperKeySet;
+    }
+
     public synchronized void syncBlobs() {
         try {
-            LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys 
{}",getBlobStoreKeySet(), getZookeeperKeySet());
+            LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}", 
getBlobStoreKeySet(), getZookeeperKeySet());
             deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), 
getZookeeperKeySet());
             updateKeySetForBlobStore(getBlobStoreKeySet());
             Set<String> keySetToDownload = 
getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
-            LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> 
{}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
+            LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> 
{}", getBlobStoreKeySet(), getZookeeperKeySet(),
+                      keySetToDownload);
 
             for (String key : keySetToDownload) {
                 try {
@@ -93,16 +88,16 @@ public class BlobSynchronizer {
                     LOG.debug("Detected deletion for the key {} while 
downloading - skipping download", key);
                 }
             }
-        } catch(InterruptedException | ClosedByInterruptException exp) {
+        } catch (InterruptedException | ClosedByInterruptException exp) {
             LOG.error("Interrupt Exception {}", exp);
-        } catch(Exception exp) {
+        } catch (Exception exp) {
             throw new RuntimeException(exp);
         }
     }
 
     public void deleteKeySetFromBlobStoreNotOnZookeeper(Set<String> 
keySetBlobStore, Set<String> keySetZookeeper) throws Exception {
         if (keySetBlobStore.removeAll(keySetZookeeper)
-                || (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) {
+            || (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) {
             LOG.debug("Key set to delete in blobstore {}", keySetBlobStore);
             for (String key : keySetBlobStore) {
                 blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject());

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java
index 20a389f..782a5bf 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.blobstore;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -34,7 +29,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Timer;
 import java.util.TimerTask;
-
 import org.apache.storm.Config;
 import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
@@ -48,71 +42,8 @@ public class FileBlobStoreImpl {
     private static final int BUCKETS = 1024;
     private static final Logger LOG = 
LoggerFactory.getLogger(FileBlobStoreImpl.class);
     private static final Timer timer = new Timer("FileBlobStore cleanup 
thread", true);
-
-    public class KeyInHashDirIterator implements Iterator<String> {
-        private int currentBucket = 0;
-        private Iterator<String> it = null;
-        private String next = null;
-
-        public KeyInHashDirIterator() throws IOException {
-            primeNext();
-        }
-
-        private void primeNext() throws IOException {
-            while (it == null && currentBucket < BUCKETS) {
-                String name = String.valueOf(currentBucket);
-                File dir = new File(fullPath, name);
-                try {
-                    it = listKeys(dir);
-                } catch (FileNotFoundException e) {
-                    it = null;
-                }
-                if (it == null || !it.hasNext()) {
-                    it = null;
-                    currentBucket++;
-                } else {
-                    next = it.next();
-                }
-            }
-        }
-
-        @Override
-        public boolean hasNext() {
-            return next != null;
-        }
-
-        @Override
-        public String next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            String current = next;
-            next = null;
-            if (it != null) {
-                if (!it.hasNext()) {
-                    it = null;
-                    currentBucket++;
-                    try {
-                        primeNext();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                } else {
-                    next = it.next();
-                }
-            }
-            return current;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("Delete Not Supported");
-        }
-    }
-
     private File fullPath;
     private TimerTask cleanup = null;
-
     public FileBlobStoreImpl(File path, Map<String, Object> conf) throws 
IOException {
         LOG.info("Creating new blob store based in {}", path);
         fullPath = path;
@@ -185,9 +116,9 @@ public class FileBlobStoreImpl {
 
     @VisibleForTesting
     File getKeyDir(String key) {
-        String hash = String.valueOf(Math.abs((long)key.hashCode()) % BUCKETS);
+        String hash = String.valueOf(Math.abs((long) key.hashCode()) % 
BUCKETS);
         File ret = new File(new File(fullPath, hash), key);
-        LOG.debug("{} Looking for {} in {}", new Object[]{fullPath, key, 
hash});
+        LOG.debug("{} Looking for {} in {}", new Object[]{ fullPath, key, hash 
});
         return ret;
     }
 
@@ -203,7 +134,7 @@ public class FileBlobStoreImpl {
                 try {
                     keyDir.delete();
                 } catch (Exception e) {
-                    LOG.warn("Could not delete "+keyDir+" will try again 
later");
+                    LOG.warn("Could not delete " + keyDir + " will try again 
later");
                 }
             }
             while (i.hasNext()) {
@@ -221,12 +152,12 @@ public class FileBlobStoreImpl {
         ArrayList<LocalFsBlobStoreFile> ret = new 
ArrayList<LocalFsBlobStoreFile>();
         File[] files = path.listFiles();
         if (files != null) {
-            for (File sub: files) {
+            for (File sub : files) {
                 try {
                     ret.add(new LocalFsBlobStoreFile(sub.getParentFile(), 
sub.getName()));
                 } catch (IllegalArgumentException e) {
                     //Ignored the file did not match
-                    LOG.warn("Found an unexpected file in {} {}",path, 
sub.getName());
+                    LOG.warn("Found an unexpected file in {} {}", path, 
sub.getName());
                 }
             }
         }
@@ -267,4 +198,65 @@ public class FileBlobStoreImpl {
             cleanup = null;
         }
     }
+
+    public class KeyInHashDirIterator implements Iterator<String> {
+        private int currentBucket = 0;
+        private Iterator<String> it = null;
+        private String next = null;
+
+        public KeyInHashDirIterator() throws IOException {
+            primeNext();
+        }
+
+        private void primeNext() throws IOException {
+            while (it == null && currentBucket < BUCKETS) {
+                String name = String.valueOf(currentBucket);
+                File dir = new File(fullPath, name);
+                try {
+                    it = listKeys(dir);
+                } catch (FileNotFoundException e) {
+                    it = null;
+                }
+                if (it == null || !it.hasNext()) {
+                    it = null;
+                    currentBucket++;
+                } else {
+                    next = it.next();
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            String current = next;
+            next = null;
+            if (it != null) {
+                if (!it.hasNext()) {
+                    it = null;
+                    currentBucket++;
+                    try {
+                        primeNext();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                } else {
+                    next = it.next();
+                }
+            }
+            return current;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Delete Not Supported");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java
index 570e0ad..31ffae9 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -1,28 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.blobstore;
 
 import java.nio.ByteBuffer;
-import java.util.TreeSet;
 import java.util.List;
-import java.util.Map;
-
+import java.util.TreeSet;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
@@ -119,7 +111,7 @@ import org.slf4j.LoggerFactory;
  */
 public class KeySequenceNumber {
     private static final Logger LOG = 
LoggerFactory.getLogger(KeySequenceNumber.class);
-    private final String 
BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber";
+    private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE = 
"/blobstoremaxkeysequencenumber";
     private final String key;
     private final NimbusInfo nimbusInfo;
     private final int INT_CAPACITY = 4;
@@ -138,7 +130,7 @@ public class KeySequenceNumber {
                 
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                         
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE
 + "/" + key);
                 zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE 
+ "/" + key,
-                        
ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array());
+                                           
ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array());
                 return INITIAL_SEQUENCE_NUMBER;
             }
 
@@ -157,7 +149,7 @@ public class KeySequenceNumber {
             // if not assign the highest sequence number.
             for (String stateInfo : stateInfoList) {
                 
sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo)
-                        .getSequenceNumber()));
+                                                                   
.getSequenceNumber()));
             }
 
             // Update scenario 2 and 3 explain the code logic written here
@@ -201,7 +193,7 @@ public class KeySequenceNumber {
             // there's a race condition with a delete: either blobstore or 
blobstoremaxsequence
             // this should be thrown to the caller to indicate that the key is 
invalid now
             throw new KeyNotFoundException(key);
-        } catch(Exception e) {
+        } catch (Exception e) {
             // in other case, just set this to 0 to trigger re-sync later
             LOG.error("Exception {}", e);
             return INITIAL_SEQUENCE_NUMBER - 1;
@@ -210,8 +202,8 @@ public class KeySequenceNumber {
 
     private boolean checkIfStateContainsCurrentNimbusHost(List<String> 
stateInfoList, NimbusInfo nimbusInfo) {
         boolean containsNimbusHost = false;
-        for(String stateInfo:stateInfoList) {
-            if(stateInfo.contains(nimbusInfo.getHost())) {
+        for (String stateInfo : stateInfoList) {
+            if (stateInfo.contains(nimbusInfo.getHost())) {
                 containsNimbusHost = true;
                 break;
             }
@@ -221,11 +213,11 @@ public class KeySequenceNumber {
 
     private void incrementMaxSequenceNumber(CuratorFramework zkClient, int 
count) throws Exception {
         zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + 
key,
-                ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array());
+                                   
ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array());
     }
 
     private int getMaxSequenceNumber(CuratorFramework zkClient) throws 
Exception {
         return ByteBuffer.wrap(zkClient.getData()
-                .forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + 
key)).getInt();
+                                       
.forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key)).getInt();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
index 831fb78..6da1f47 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -1,55 +1,46 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.blobstore;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.security.auth.Subject;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.cluster.DaemonType;
-import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyAlreadyExistsException;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.ReadableBlobMeta;
-
+import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.curator.framework.CuratorFramework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN;
 import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
 import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Provides a local file system backed blob store implementation for Nimbus.
  *
@@ -72,11 +63,11 @@ public class LocalFsBlobStore extends BlobStore {
     public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
     private static final String DATA_PREFIX = "data_";
     private static final String META_PREFIX = "meta_";
-    protected BlobStoreAclHandler _aclHandler;
     private final String BLOBSTORE_SUBTREE = "/blobstore/";
+    private final int allPermissions = READ | WRITE | ADMIN;
+    protected BlobStoreAclHandler _aclHandler;
     private NimbusInfo nimbusInfo;
     private FileBlobStoreImpl fbs;
-    private final int allPermissions = READ | WRITE | ADMIN;
     private Map<String, Object> conf;
     private CuratorFramework zkClient;
 
@@ -98,22 +89,23 @@ public class LocalFsBlobStore extends BlobStore {
     }
 
     @Override
-    public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+    public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException,
+        KeyAlreadyExistsException {
         LOG.debug("Creating Blob for key {}", key);
         validateKey(key);
         _aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions);
         BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
         _aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key);
-        if (fbs.exists(DATA_PREFIX+key)) {
+        if (fbs.exists(DATA_PREFIX + key)) {
             throw new KeyAlreadyExistsException(key);
         }
         BlobStoreFileOutputStream mOut = null;
         try {
-            mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+            mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX + key, 
true));
             mOut.write(Utils.thriftSerialize(meta));
             mOut.close();
             mOut = null;
-            return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+            return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX + key, 
true));
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {
@@ -132,7 +124,7 @@ public class LocalFsBlobStore extends BlobStore {
         validateKey(key);
         checkPermission(key, who, WRITE);
         try {
-            return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
false));
+            return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX + key, 
false));
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -141,14 +133,14 @@ public class LocalFsBlobStore extends BlobStore {
     private SettableBlobMeta getStoredBlobMeta(String key) throws 
KeyNotFoundException {
         InputStream in = null;
         try {
-            LocalFsBlobStoreFile pf = fbs.read(META_PREFIX+key);
+            LocalFsBlobStoreFile pf = fbs.read(META_PREFIX + key);
             try {
                 in = pf.getInputStream();
             } catch (FileNotFoundException fnf) {
                 throw new KeyNotFoundException(key);
             }
             ByteArrayOutputStream out = new ByteArrayOutputStream();
-            byte [] buffer = new byte[2048];
+            byte[] buffer = new byte[2048];
             int len;
             while ((len = in.read(buffer)) > 0) {
                 out.write(buffer, 0, len);
@@ -172,7 +164,7 @@ public class LocalFsBlobStore extends BlobStore {
     @Override
     public ReadableBlobMeta getBlobMeta(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
         validateKey(key);
-        if(!checkForBlobOrDownload(key)) {
+        if (!checkForBlobOrDownload(key)) {
             checkForBlobUpdate(key);
         }
         SettableBlobMeta meta = getStoredBlobMeta(key);
@@ -180,7 +172,7 @@ public class LocalFsBlobStore extends BlobStore {
         ReadableBlobMeta rbm = new ReadableBlobMeta();
         rbm.set_settable(meta);
         try {
-            LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX+key);
+            LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX + key);
             rbm.set_version(pf.getModTime());
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -198,7 +190,7 @@ public class LocalFsBlobStore extends BlobStore {
         _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
         BlobStoreFileOutputStream mOut = null;
         try {
-            mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
false));
+            mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX + key, 
false));
             mOut.write(Utils.thriftSerialize(meta));
             mOut.close();
             mOut = null;
@@ -233,7 +225,7 @@ public class LocalFsBlobStore extends BlobStore {
             // able to delete the blob without checking meta's ACL
             // skip checking everything and continue deleting local files
             LOG.debug("Given subject is eligible to delete key without 
checking ACL, skipping... key: {} subject: {}",
-                    key, who);
+                      key, who);
         }
 
         try {
@@ -265,13 +257,13 @@ public class LocalFsBlobStore extends BlobStore {
     @Override
     public InputStreamWithMeta getBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
         validateKey(key);
-        if(!checkForBlobOrDownload(key)) {
+        if (!checkForBlobOrDownload(key)) {
             checkForBlobUpdate(key);
         }
         SettableBlobMeta meta = getStoredBlobMeta(key);
         _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
         try {
-            return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX+key));
+            return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX + key));
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -314,7 +306,7 @@ public class LocalFsBlobStore extends BlobStore {
     @Override
     public int updateBlobReplication(String key, int replication, Subject who) 
throws AuthorizationException, KeyNotFoundException {
         throw new UnsupportedOperationException("For local file system blob 
store the update blobs function does not work. " +
-                "Please use HDFS blob store to make this feature available.");
+                                                "Please use HDFS blob store to 
make this feature available.");
     }
 
     //This additional check and download is for nimbus high availability in 
case you have more than one nimbus
@@ -347,7 +339,7 @@ public class LocalFsBlobStore extends BlobStore {
     public void fullCleanup(long age) throws IOException {
         fbs.fullCleanup(age);
     }
-    
+
     @VisibleForTesting
     File getKeyDataDir(String key) {
         return fbs.getKeyDir(DATA_PREFIX + key);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java
 
b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java
index 1b7c969..d160ba6 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java
@@ -1,23 +1,16 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.blobstore;
 
-import org.apache.storm.generated.SettableBlobMeta;
+package org.apache.storm.blobstore;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -28,14 +21,15 @@ import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.regex.Matcher;
+import org.apache.storm.generated.SettableBlobMeta;
 
 public class LocalFsBlobStoreFile extends BlobStoreFile {
 
     private final String _key;
     private final boolean _isTmp;
     private final File _path;
-    private Long _modTime = null;
     private final boolean _mustBeNew;
+    private Long _modTime = null;
     private SettableBlobMeta meta;
 
     public LocalFsBlobStoreFile(File base, String name) {
@@ -44,7 +38,7 @@ public class LocalFsBlobStoreFile extends BlobStoreFile {
         } else {
             Matcher m = TMP_NAME_PATTERN.matcher(name);
             if (!m.matches()) {
-                throw new IllegalArgumentException("File name does not match 
'"+name+"' !~ "+TMP_NAME_PATTERN);
+                throw new IllegalArgumentException("File name does not match 
'" + name + "' !~ " + TMP_NAME_PATTERN);
             }
             _isTmp = true;
         }
@@ -58,7 +52,7 @@ public class LocalFsBlobStoreFile extends BlobStoreFile {
         _isTmp = isTmp;
         _mustBeNew = mustBeNew;
         if (_isTmp) {
-            _path = new File(base, System.currentTimeMillis()+TMP_EXT);
+            _path = new File(base, System.currentTimeMillis() + TMP_EXT);
         } else {
             _path = new File(base, BlobStoreFile.BLOBSTORE_DATA_FILE);
         }
@@ -109,7 +103,7 @@ public class LocalFsBlobStoreFile extends BlobStoreFile {
             success = _path.createNewFile();
         }
         if (!success) {
-            throw new IOException(_path+" already exists");
+            throw new IOException(_path + " already exists");
         }
         return new FileOutputStream(_path);
     }
@@ -137,18 +131,18 @@ public class LocalFsBlobStoreFile extends BlobStoreFile {
     }
 
     @Override
-    public SettableBlobMeta getMetadata () {
+    public SettableBlobMeta getMetadata() {
         return meta;
     }
 
     @Override
-    public void setMetadata (SettableBlobMeta meta) {
+    public void setMetadata(SettableBlobMeta meta) {
         this.meta = meta;
     }
 
     @Override
     public String toString() {
-        return _path+":"+(_isTmp ? "tmp": 
BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key;
+        return _path + ":" + (_isTmp ? "tmp" : 
BlobStoreFile.BLOBSTORE_DATA_FILE) + ":" + _key;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
 
b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
index 733d2ab..8e2ae3c 100644
--- 
a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
+++ 
b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.container;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
 
b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
index 4079215..a68e18e 100644
--- 
a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
+++ 
b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.container.cgroup;
@@ -34,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.container.ResourceIsolationInterface;
@@ -50,17 +43,40 @@ import org.slf4j.LoggerFactory;
 public class CgroupManager implements ResourceIsolationInterface {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CgroupManager.class);
-
+    private static final Pattern MEMINFO_PATTERN = 
Pattern.compile("^([^:\\s]+):\\s*([0-9]+)\\s*kB$");
     private CgroupCenter center;
-
     private Hierarchy hierarchy;
-
     private CgroupCommon rootCgroup;
-
     private String rootDir;
-
     private Map<String, Object> conf;
 
+    static long getMemInfoFreeMb() throws IOException {
+        //MemFree:        14367072 kB
+        //Buffers:          536512 kB
+        //Cached:          1192096 kB
+        // MemFree + Buffers + Cached
+        long memFree = 0;
+        long buffers = 0;
+        long cached = 0;
+        try (BufferedReader in = new BufferedReader(new 
FileReader("/proc/meminfo"))) {
+            String line = null;
+            while ((line = in.readLine()) != null) {
+                Matcher match = MEMINFO_PATTERN.matcher(line);
+                if (match.matches()) {
+                    String tag = match.group(1);
+                    if (tag.equalsIgnoreCase("MemFree")) {
+                        memFree = Long.parseLong(match.group(2));
+                    } else if (tag.equalsIgnoreCase("Buffers")) {
+                        buffers = Long.parseLong(match.group(2));
+                    } else if (tag.equalsIgnoreCase("Cached")) {
+                        cached = Long.parseLong(match.group(2));
+                    }
+                }
+            }
+        }
+        return (memFree + buffers + cached) / 1024;
+    }
+
     /**
      * initialize data structures.
      *
@@ -102,7 +118,7 @@ public class CgroupManager implements 
ResourceIsolationInterface {
             Set<SubSystemType> types = new HashSet<>();
             types.add(SubSystemType.cpu);
             this.hierarchy = new 
Hierarchy(DaemonConfig.getCgroupStormHierarchyName(conf), types,
-                DaemonConfig.getCgroupStormHierarchyDir(conf));
+                                           
DaemonConfig.getCgroupStormHierarchyDir(conf));
         }
         this.rootCgroup =
             new CgroupCommon(this.rootDir, this.hierarchy, 
this.hierarchy.getRootCgroups());
@@ -194,7 +210,7 @@ public class CgroupManager implements 
ResourceIsolationInterface {
             Set<Integer> tasks = workerGroup.getTasks();
             if (!tasks.isEmpty()) {
                 throw new Exception("Cannot correctly shutdown worker CGroup " 
+ workerId + "tasks " + tasks
-                    + " still running!");
+                                    + " still running!");
             }
             this.center.deleteCgroup(workerGroup);
         } catch (Exception e) {
@@ -254,35 +270,6 @@ public class CgroupManager implements 
ResourceIsolationInterface {
         return memCore.getPhysicalUsage();
     }
 
-    private static final Pattern MEMINFO_PATTERN = 
Pattern.compile("^([^:\\s]+):\\s*([0-9]+)\\s*kB$");
-
-    static long getMemInfoFreeMb() throws IOException {
-        //MemFree:        14367072 kB
-        //Buffers:          536512 kB
-        //Cached:          1192096 kB
-        // MemFree + Buffers + Cached
-        long memFree = 0;
-        long buffers = 0;
-        long cached = 0;
-        try (BufferedReader in = new BufferedReader(new 
FileReader("/proc/meminfo"))) {
-            String line = null;
-            while ((line = in.readLine()) != null) {
-                Matcher match = MEMINFO_PATTERN.matcher(line);
-                if (match.matches()) {
-                    String tag = match.group(1);
-                    if (tag.equalsIgnoreCase("MemFree")) {
-                        memFree = Long.parseLong(match.group(2));
-                    } else if (tag.equalsIgnoreCase("Buffers")) {
-                        buffers = Long.parseLong(match.group(2));
-                    } else if (tag.equalsIgnoreCase("Cached")) {
-                        cached = Long.parseLong(match.group(2));
-                    }
-                }
-            }
-        }
-        return (memFree + buffers + cached) / 1024;
-    }
-
     @Override
     public long getSystemFreeMemoryMb() throws IOException {
         long rootCgroupLimitFree = Long.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
index 489af28..939c1fc 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
@@ -15,17 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.drpc;
 
 import java.util.concurrent.Semaphore;
-
 import org.apache.storm.generated.DRPCExceptionType;
 import org.apache.storm.generated.DRPCExecutionException;
 import org.apache.storm.generated.DRPCRequest;
 
 public class BlockingOutstandingRequest extends OutstandingRequest {
     public static final RequestFactory<BlockingOutstandingRequest> FACTORY =
-            (function, request) -> new BlockingOutstandingRequest(function, 
request);
+        (function, request) -> new BlockingOutstandingRequest(function, 
request);
     private Semaphore _sem;
     private volatile String _result = null;
     private volatile DRPCExecutionException _e = null;
@@ -49,7 +49,7 @@ public class BlockingOutstandingRequest extends 
OutstandingRequest {
         if (_e == null) {
             _e = new DRPCExecutionException("Internal Error: No Result and No 
Exception");
             _e.set_type(DRPCExceptionType.INTERNAL_ERROR);
-        } 
+        }
         throw _e;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java
index 4093961..94d6b3b 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java
@@ -15,8 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.drpc;
 
+import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
 import java.security.Principal;
 import java.util.HashMap;
 import java.util.Map;
@@ -26,7 +29,6 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.generated.AuthorizationException;
@@ -42,26 +44,48 @@ import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.Meter;
-import com.google.common.annotations.VisibleForTesting;
-
 public class DRPC implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(DRPC.class);
-    private static final DRPCRequest NOTHING_REQUEST = new DRPCRequest("","");
+    private static final DRPCRequest NOTHING_REQUEST = new DRPCRequest("", "");
     private static final DRPCExecutionException TIMED_OUT = new 
DRPCExecutionException("Timed Out");
     private static final DRPCExecutionException SHUT_DOWN = new 
DRPCExecutionException("Server Shutting Down");
     private static final DRPCExecutionException DEFAULT_FAILED = new 
DRPCExecutionException("Request failed");
-    static {
-        TIMED_OUT.set_type(DRPCExceptionType.SERVER_TIMEOUT);
-        SHUT_DOWN.set_type(DRPCExceptionType.SERVER_SHUTDOWN);
-        DEFAULT_FAILED.set_type(DRPCExceptionType.FAILED_REQUEST);
-    }
     private static final Meter meterServerTimedOut = 
StormMetricsRegistry.registerMeter("drpc:num-server-timedout-requests");
     private static final Meter meterExecuteCalls = 
StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
     private static final Meter meterResultCalls = 
StormMetricsRegistry.registerMeter("drpc:num-result-calls");
     private static final Meter meterFailRequestCalls = 
StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
     private static final Meter meterFetchRequestCalls = 
StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
-    
+
+    static {
+        TIMED_OUT.set_type(DRPCExceptionType.SERVER_TIMEOUT);
+        SHUT_DOWN.set_type(DRPCExceptionType.SERVER_SHUTDOWN);
+        DEFAULT_FAILED.set_type(DRPCExceptionType.FAILED_REQUEST);
+    }
+
+    //Waiting to be fetched
+    private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<OutstandingRequest>> _queues =
+        new ConcurrentHashMap<>();
+    //Waiting to be returned
+    private final ConcurrentHashMap<String, OutstandingRequest> _requests =
+        new ConcurrentHashMap<>();
+    private final Timer _timer = new Timer();
+    private final AtomicLong _ctr = new AtomicLong(0);
+    private final IAuthorizer _auth;
+
+    public DRPC(Map<String, Object> conf) {
+        this(mkAuthorizationHandler((String) 
conf.get(DaemonConfig.DRPC_AUTHORIZER), conf),
+             
ObjectReader.getInt(conf.get(DaemonConfig.DRPC_REQUEST_TIMEOUT_SECS), 600) * 
1000);
+    }
+    public DRPC(IAuthorizer auth, long timeoutMs) {
+        _auth = auth;
+        _timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                cleanupAll(timeoutMs, TIMED_OUT);
+            }
+        }, timeoutMs / 2, timeoutMs / 2);
+    }
+
     private static IAuthorizer mkAuthorizationHandler(String klassname, 
Map<String, Object> conf) {
         try {
             return StormCommon.mkAuthorizationHandler(klassname, conf);
@@ -76,7 +100,7 @@ public class DRPC implements AutoCloseable {
 
     private static void logAccess(ReqContext reqContext, String operation, 
String function) {
         ThriftAccessLogger.logAccessFunction(reqContext.requestID(), 
reqContext.remoteAddress(), reqContext.principal(), operation,
-            function);
+                                             function);
     }
 
     @VisibleForTesting
@@ -100,33 +124,7 @@ public class DRPC implements AutoCloseable {
             }
         }
     }
-    
-    //Waiting to be fetched
-    private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<OutstandingRequest>> _queues =
-            new ConcurrentHashMap<>();
-    //Waiting to be returned
-    private final ConcurrentHashMap<String, OutstandingRequest> _requests = 
-            new ConcurrentHashMap<>();
-    private final Timer _timer = new Timer();
-    private final AtomicLong _ctr = new AtomicLong(0);
-    private final IAuthorizer _auth;
-    
-    public DRPC(Map<String, Object> conf) {
-        
this(mkAuthorizationHandler((String)conf.get(DaemonConfig.DRPC_AUTHORIZER), 
conf),
-                
ObjectReader.getInt(conf.get(DaemonConfig.DRPC_REQUEST_TIMEOUT_SECS), 600) * 
1000);
-    }
-    
-    public DRPC(IAuthorizer auth, long timeoutMs) {
-        _auth = auth;
-        _timer.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                cleanupAll(timeoutMs, TIMED_OUT);
-            }
-        }, timeoutMs/2, timeoutMs/2);
-    }
-    
-    
+
     private void checkAuthorization(String operation, String function) throws 
AuthorizationException {
         checkAuthorization(ReqContext.context(), _auth, operation, function);
     }
@@ -134,7 +132,7 @@ public class DRPC implements AutoCloseable {
     private void checkAuthorizationNoLog(String operation, String function) 
throws AuthorizationException {
         checkAuthorization(ReqContext.context(), _auth, operation, function, 
false);
     }
-    
+
     private void cleanup(String id) {
         OutstandingRequest req = _requests.remove(id);
         if (req != null && !req.wasFetched()) {
@@ -152,7 +150,7 @@ public class DRPC implements AutoCloseable {
             }
         }
     }
-    
+
     private String nextId() {
         return String.valueOf(_ctr.incrementAndGet());
     }
@@ -207,7 +205,8 @@ public class DRPC implements AutoCloseable {
         }
     }
 
-    public <T extends OutstandingRequest> T execute(String functionName, 
String funcArgs, RequestFactory<T> factory) throws AuthorizationException {
+    public <T extends OutstandingRequest> T execute(String functionName, 
String funcArgs, RequestFactory<T> factory) throws
+        AuthorizationException {
         meterExecuteCalls.mark();
         checkAuthorization("execute", functionName);
         String id = nextId();
@@ -218,11 +217,11 @@ public class DRPC implements AutoCloseable {
         q.add(req);
         return req;
     }
-    
+
     public String executeBlocking(String functionName, String funcArgs) throws 
DRPCExecutionException, AuthorizationException {
         BlockingOutstandingRequest req = execute(functionName, funcArgs, 
BlockingOutstandingRequest.FACTORY);
         try {
-            LOG.debug("Waiting for result {} {}",functionName, funcArgs);
+            LOG.debug("Waiting for result {} {}", functionName, funcArgs);
             return req.getResult();
         } catch (DRPCExecutionException e) {
             throw e;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java
index 2d88e8e..01d2392 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.drpc;
 
 import org.apache.storm.generated.AuthorizationException;
@@ -52,7 +53,7 @@ public class DRPCThrift implements DistributedRPC.Iface, 
DistributedRPCInvocatio
 
     @Override
     public String execute(String functionName, String funcArgs)
-            throws DRPCExecutionException, AuthorizationException {
+        throws DRPCExecutionException, AuthorizationException {
         return _drpc.executeBlocking(functionName, funcArgs);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java
index adb916f..06c596e 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.drpc;
 
 import org.apache.storm.generated.DRPCExecutionException;
@@ -42,7 +43,7 @@ public abstract class OutstandingRequest {
     }
 
     public boolean wasFetched() {
-        return _fetched; 
+        return _fetched;
     }
 
     public String getFunction() {
@@ -54,5 +55,6 @@ public abstract class OutstandingRequest {
     }
 
     public abstract void returnResult(String result);
+
     public abstract void fail(DRPCExecutionException e);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java
index 6a0e610..e6cd799 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java
@@ -15,10 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.drpc;
 
 import org.apache.storm.generated.DRPCRequest;
 
 public interface RequestFactory<T extends OutstandingRequest> {
-    public T mkRequest(String function, DRPCRequest req); 
+    public T mkRequest(String function, DRPCRequest req);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java
index 85ee27f..abd7cb1 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
index cd6d069..47e4f24 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.metrics.reporters;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
index 3ab6a99..4952051 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.metrics.reporters;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
index c369879..3fc77d1 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.metrics.reporters;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
index 958a01d..0e1a6e3 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
@@ -1,32 +1,28 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Reporter;
-
 import java.io.Closeable;
 import java.util.Map;
 
 
 public interface PreparableReporter<T extends Reporter & Closeable> {
-  void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf);
-  void start();
-  void stop();
+    void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf);
+
+    void start();
+
+    void stop();
 
 }

Reply via email to