Repository: storm
Updated Branches:
  refs/heads/master ab7b4ca77 -> 09e01231c


http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java 
b/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
deleted file mode 100644
index e7ac54e..0000000
--- a/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.utils;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.dsl.ProducerType;
-import org.junit.Assert;
-import org.junit.Test;
-import junit.framework.TestCase;
-
-public class DisruptorQueueTest extends TestCase {
-
-    private final static int TIMEOUT = 5000; // MS
-    private final static int PRODUCER_NUM = 4;
-
-    @Test
-    public void testFirstMessageFirst() throws InterruptedException {
-      for (int i = 0; i < 100; i++) {
-        DisruptorQueue queue = createQueue("firstMessageOrder", 16);
-
-        queue.publish("FIRST");
-
-        Runnable producer = new IncProducer(queue, i+100);
-
-        final AtomicReference<Object> result = new AtomicReference<>();
-        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
-            private boolean head = true;
-
-            @Override
-            public void onEvent(Object obj, long sequence, boolean endOfBatch)
-                    throws Exception {
-                if (head) {
-                    head = false;
-                    result.set(obj);
-                }
-            }
-        });
-
-        run(producer, consumer, queue);
-        Assert.assertEquals("We expect to receive first published message 
first, but received " + result.get(),
-                "FIRST", result.get());
-      }
-    }
-   
-    @Test 
-    public void testInOrder() throws InterruptedException {
-        final AtomicBoolean allInOrder = new AtomicBoolean(true);
-
-        DisruptorQueue queue = createQueue("consumerHang", 1024);
-        Runnable producer = new IncProducer(queue, 1024*1024);
-        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
-            long _expected = 0;
-            @Override
-            public void onEvent(Object obj, long sequence, boolean endOfBatch)
-                    throws Exception {
-                if (_expected != ((Number)obj).longValue()) {
-                    allInOrder.set(false);
-                    System.out.println("Expected "+_expected+" but got "+obj);
-                }
-                _expected++;
-            }
-        });
-
-        run(producer, consumer, queue, 1000, 1);
-        Assert.assertTrue("Messages delivered out of order",
-                allInOrder.get());
-    }
-
-    @Test 
-    public void testInOrderBatch() throws InterruptedException {
-        final AtomicBoolean allInOrder = new AtomicBoolean(true);
-
-        DisruptorQueue queue = createQueue("consumerHang", 10, 1024);
-        Runnable producer = new IncProducer(queue, 1024*1024);
-        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
-            long _expected = 0;
-            @Override
-            public void onEvent(Object obj, long sequence, boolean endOfBatch)
-                    throws Exception {
-                if (_expected != ((Number)obj).longValue()) {
-                    allInOrder.set(false);
-                    System.out.println("Expected "+_expected+" but got "+obj);
-                }
-                _expected++;
-            }
-        });
-
-        run(producer, consumer, queue, 1000, 1);
-        Assert.assertTrue("Messages delivered out of order",
-                allInOrder.get());
-    }
-
-
-    private void run(Runnable producer, Runnable consumer, DisruptorQueue 
queue)
-            throws InterruptedException {
-        run(producer, consumer, queue, 10, PRODUCER_NUM);
-    }
-
-    private void run(Runnable producer, Runnable consumer, DisruptorQueue 
queue, int sleepMs, int producerNum)
-            throws InterruptedException {
-
-        Thread[] producerThreads = new Thread[producerNum];
-        for (int i = 0; i < producerNum; i++) {
-            producerThreads[i] = new Thread(producer);
-            producerThreads[i].start();
-        }
-        
-        Thread consumerThread = new Thread(consumer);
-        consumerThread.start();
-        Thread.sleep(sleepMs);
-        for (int i = 0; i < producerNum; i++) {
-            producerThreads[i].interrupt();
-        }
-        
-        for (int i = 0; i < producerNum; i++) {
-            producerThreads[i].join(TIMEOUT);
-            assertFalse("producer "+i+" is still alive", 
producerThreads[i].isAlive());
-        }
-        queue.haltWithInterrupt();
-        consumerThread.join(TIMEOUT);
-        assertFalse("consumer is still alive", consumerThread.isAlive());
-    }
-
-    private static class IncProducer implements Runnable {
-        private DisruptorQueue queue;
-        private long _max;
-
-        IncProducer(DisruptorQueue queue, long max) {
-            this.queue = queue;
-            this._max = max;
-        }
-
-        @Override
-        public void run() {
-            for (long i = 0; i < _max && 
!(Thread.currentThread().isInterrupted()); i++) {
-                queue.publish(i);
-            }
-        }
-    }
-
-    private static class Consumer implements Runnable {
-        private EventHandler handler;
-        private DisruptorQueue queue;
-
-        Consumer(DisruptorQueue queue, EventHandler handler) {
-            this.handler = handler;
-            this.queue = queue;
-        }
-
-        @Override
-        public void run() {
-            try {
-                while(true) {
-                    queue.consumeBatchWhenAvailable(handler);
-                }
-            } catch(RuntimeException e) {
-                //break
-            }
-        }
-    }
-
-    private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L);
-    }
-
-    private static DisruptorQueue createQueue(String name, int batchSize, int 
queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 
batchSize, 1L);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java 
b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
new file mode 100644
index 0000000..29ba179
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.utils;
+
+import org.apache.storm.policy.WaitStrategyPark;
+import org.apache.storm.utils.JCQueue.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+
+public class JCQueueBackpressureTest extends TestCase {
+
+    @Test
+    public void testNoReOrderingUnderBackPressure() throws Exception {
+        final int MESSAGES = 100;
+        final int CAPACITY = 64;
+
+        final JCQueue queue = createQueue("testBackPressure", CAPACITY);
+
+        for (int i = 0; i < MESSAGES; i++) {
+            if (!queue.tryPublish(i)) {
+                Assert.assertTrue(queue.tryPublishToOverflow(i));
+            }
+        }
+
+        TestConsumer consumer = new TestConsumer();
+        queue.consume(consumer);
+        Assert.assertEquals(MESSAGES, consumer.lastMsg);
+
+    }
+
+    private static JCQueue createQueue(String name, int queueSize) {
+        return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0));
+    }
+
+    private static class TestConsumer implements Consumer {
+        int lastMsg = 0;
+
+        @Override
+        public void accept(Object o) {
+            Integer i = (Integer) o;
+            Assert.assertEquals(lastMsg++, i.intValue());
+            System.err.println(i);
+        }
+
+        @Override
+        public void flush() throws InterruptedException
+        { }
+    }
+
+
+    // check that tryPublish() & tryOverflowPublish() work as expected
+    @Test
+    public void testBasicBackPressure() throws Exception {
+        final int MESSAGES = 100;
+        final int CAPACITY = 64;
+
+        final JCQueue queue = createQueue("testBackPressure", CAPACITY);
+
+        // pump more msgs than Q size & verify msg count is as expexted
+        for (int i = 0; i < MESSAGES; i++) {
+            if (i>=CAPACITY) {
+                Assert.assertFalse( queue.tryPublish(i) );
+            } else {
+                Assert.assertTrue( queue.tryPublish(i) );
+            }
+        }
+        Assert.assertEquals(CAPACITY, queue.size());
+
+        Assert.assertEquals(0, queue.getOverflowCount());
+
+        // drain 1 element and ensure BP is relieved (i.e tryPublish() 
succeeds)
+        final MutableLong consumeCount = new MutableLong(0);
+        queue.consume( new TestConsumer() , () -> consumeCount.increment()<=1 
);
+        Assert.assertEquals(CAPACITY-1, queue.size());
+        Assert.assertTrue(queue.tryPublish(0));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java 
b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
new file mode 100644
index 0000000..bdc4937
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.policy.WaitStrategyPark;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class JCQueueTest {
+
+    private final static int TIMEOUT = 5000; // MS
+    private final static int PRODUCER_NUM = 4;
+    IWaitStrategy waitStrategy = new WaitStrategyPark(100);
+
+    @Test(timeout=10000)
+    public void testFirstMessageFirst() throws InterruptedException {
+        for (int i = 0; i < 100; i++) {
+            JCQueue queue = createQueue("firstMessageOrder", 16);
+
+            queue.publish("FIRST");
+
+            Runnable producer = new IncProducer(queue, i+100);
+
+            final AtomicReference<Object> result = new AtomicReference<>();
+            Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() {
+                private boolean head = true;
+
+                @Override
+                public void accept(Object event) {
+                    if (event == JCQueue.INTERRUPT) {
+                        throw new RuntimeException(new 
InterruptedException("ConsumerThd interrupted") );
+                    }
+                    if (head) {
+                        head = false;
+                        result.set(event);
+                    }
+                }
+
+                @Override
+                public void flush() {
+                    return;
+                }
+            });
+
+            run(producer, consumer, queue);
+            Assert.assertEquals("We expect to receive first published message 
first, but received " + result.get(),
+                    "FIRST", result.get());
+        }
+    }
+
+    @Test(timeout=10000)
+    public void testInOrder() throws InterruptedException {
+        final AtomicBoolean allInOrder = new AtomicBoolean(true);
+
+        JCQueue queue = createQueue("consumerHang", 1024);
+        Runnable producer = new IncProducer(queue, 1024*1024);
+        Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() {
+            long _expected = 0;
+            @Override
+            public void accept(Object obj) {
+                if (_expected != ((Number) obj).longValue()) {
+                    allInOrder.set(false);
+                    System.out.println("Expected " + _expected + " but got " + 
obj);
+                }
+                _expected++;
+            }
+
+            @Override
+            public void flush() {
+                return;
+            }
+        } ) ;
+        run(producer, consumer, queue, 1000, 1);
+        Assert.assertTrue("Messages delivered out of order",
+                allInOrder.get());
+    }
+
+    @Test(timeout=10000)
+    public void testInOrderBatch() throws InterruptedException {
+        final AtomicBoolean allInOrder = new AtomicBoolean(true);
+
+        JCQueue queue = createQueue("consumerHang", 10, 1024);
+        Runnable producer = new IncProducer(queue, 1024*1024);
+        Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() {
+            long _expected = 0;
+            @Override
+            public void accept(Object obj) {
+                if (_expected != ((Number)obj).longValue()) {
+                    allInOrder.set(false);
+                    System.out.println("Expected "+_expected+" but got "+obj);
+                }
+                _expected++;
+            }
+
+            @Override
+            public void flush() {
+                return;
+            }
+        });
+
+        run(producer, consumer, queue, 1000, 1);
+        Assert.assertTrue("Messages delivered out of order",
+                allInOrder.get());
+    }
+
+
+    private void run(Runnable producer, Runnable consumer, JCQueue queue)
+            throws InterruptedException {
+        run(producer, consumer, queue, 20, PRODUCER_NUM);
+    }
+
+    private void run(Runnable producer, Runnable consumer, JCQueue queue, int 
sleepMs, int producerNum)
+            throws InterruptedException {
+
+        Thread[] producerThreads = new Thread[producerNum];
+        for (int i = 0; i < producerNum; i++) {
+            producerThreads[i] = new Thread(producer);
+            producerThreads[i].start();
+        }
+
+        Thread consumerThread = new Thread(consumer);
+        consumerThread.start();
+        Thread.sleep(sleepMs);
+        for (int i = 0; i < producerNum; i++) {
+            producerThreads[i].interrupt();
+        }
+        for (int i = 0; i < producerNum; i++) {
+            producerThreads[i].join(TIMEOUT);
+            assertFalse("producer "+i+" is still alive", 
producerThreads[i].isAlive());
+        }
+
+        Thread.sleep(sleepMs);
+        assertTrue("Unable to send halt interrupt", queue.haltWithInterrupt());
+        consumerThread.join(TIMEOUT);
+        assertFalse("consumer is still alive", consumerThread.isAlive());
+    }
+
+    private static class IncProducer implements Runnable {
+        private JCQueue queue;
+        private long _max;
+
+        IncProducer(JCQueue queue, long max) {
+            this.queue = queue;
+            this._max = max;
+        }
+
+        @Override
+        public void run() {
+            try {
+                for (long i = 0; i < _max && 
!(Thread.currentThread().isInterrupted()); i++) {
+                    queue.publish(i);
+                }
+            } catch (InterruptedException e) {
+                return;
+            }
+        }
+    }
+
+    private static class ConsumerThd implements Runnable {
+        private JCQueue.Consumer handler;
+        private JCQueue queue;
+
+        ConsumerThd(JCQueue queue, JCQueue.Consumer handler) {
+            this.handler = handler;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while(true) {
+                    queue.consume(handler);
+                }
+            } catch(RuntimeException e) {
+                //break
+            }
+        }
+    }
+
+    private JCQueue createQueue(String name, int queueSize) {
+        return new JCQueue(name, queueSize, 0, 1, waitStrategy);
+    }
+
+    private JCQueue createQueue(String name, int batchSize, int queueSize) {
+        return new JCQueue(name, queueSize, 0, batchSize, waitStrategy);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java
 
b/storm-client/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java
deleted file mode 100644
index b8e1770..0000000
--- 
a/storm-client/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.utils;
-
-import java.util.concurrent.atomic.AtomicLong;
-import org.junit.Assert;
-import org.junit.Test;
-import junit.framework.TestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WorkerBackpressureThreadTest extends TestCase {
-    private static final Logger LOG = 
LoggerFactory.getLogger(WorkerBackpressureThreadTest.class);
-
-    @Test
-    public void testNormalEvent() throws Exception {
-        Object trigger = new Object();
-        AtomicLong workerData = new AtomicLong(0);
-        WorkerBackpressureCallback callback = new WorkerBackpressureCallback() 
{
-            @Override
-            public void onEvent(Object obj) {
-                ((AtomicLong) obj).getAndDecrement();
-            }
-        };
-        WorkerBackpressureThread workerBackpressureThread = new 
WorkerBackpressureThread(trigger, workerData, callback);
-        workerBackpressureThread.start();
-        WorkerBackpressureThread.notifyBackpressureChecker(trigger);
-        long start = System.currentTimeMillis();
-        while (workerData.get() == 0) {
-            assertTrue("Timeout", (System.currentTimeMillis() - start) < 1000);
-            Thread.sleep(100);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git 
a/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
 
b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
index b30788a..9bdefb8 100644
--- 
a/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
+++ 
b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.clojure;
 
+
 import clojure.lang.ILookup;
 import clojure.lang.ISeq;
-import clojure.lang.AFn;
 import clojure.lang.IPersistentMap;
 import clojure.lang.PersistentArrayMap;
 import clojure.lang.IMapEntry;
@@ -30,12 +30,10 @@ import java.util.Map;
 import java.util.Collection;
 import java.util.Set;
 
-public class IndifferentAccessMap extends AFn implements ILookup, 
IPersistentMap, Map {
+public class IndifferentAccessMap  implements ILookup, IPersistentMap, Map {
 
     protected IPersistentMap _map;
 
-    protected IndifferentAccessMap() {
-    }
 
     public IndifferentAccessMap(IPersistentMap map) {
         setMap(map);
@@ -77,17 +75,6 @@ public class IndifferentAccessMap extends AFn implements 
ILookup, IPersistentMap
         return ret;
     }
 
-    /* IFn */
-    @Override
-    public Object invoke(Object o) {
-        return valAt(o);
-    }
-
-    @Override
-    public Object invoke(Object o, Object notfound) {
-        return valAt(o, notfound);
-    }
-
     /* IPersistentMap */
     /* Naive implementation, but it might be good enough */
     public IPersistentMap assoc(Object k, Object v) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj 
b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 8deefc3..4257d23 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -149,7 +149,7 @@
       (.advanceClusterTime cluster 12)
       (assert-failed tracker 2)
       )))
-      
+
 (defbolt reset-timeout-bolt {} {:prepare true}
   [conf context collector]
   (let [tuple-counter (atom 1)
@@ -179,7 +179,7 @@
           _ (.setAckFailDelegate feeder tracker)
           topology (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails feeder)}
-                     {"2" (Thrift/prepareBoltDetails 
+                     {"2" (Thrift/prepareBoltDetails
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareGlobalGrouping)} 
reset-timeout-bolt)})]
     (.submitTopology cluster
@@ -288,7 +288,7 @@
                                        (doto (CompleteTopologyParam.)
                                          (.setMockedSources (MockedSources. 
{"1" [["a"] ["b"] ["c"]]}))
                                          (.setStormConf {TOPOLOGY-WORKERS 
2})))]
-        (is (Testing/multiseteq [["a"] ["b"] ["c"] ["startup"] ["startup"] 
["startup"]]
+        (is (Testing/multiseteq [["a"] ["b"] ["c"] ]
                  (Testing/readTuples results "2")))
         )))
 
@@ -464,8 +464,8 @@
                             (Thrift/prepareGlobalGrouping)}
                            prepare-tracked-bolt)})]
       (reset! bolt-prepared? false)
-      (reset! spout-opened? false)      
-      
+      (reset! spout-opened? false)
+
       (.submitTopologyWithOpts cluster
         "test"
         {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
@@ -475,7 +475,7 @@
       (.feed feeder ["a"] 1)
       (.advanceClusterTime cluster 9)
       (is (not @bolt-prepared?))
-      (is (not @spout-opened?))        
+      (is (not @spout-opened?))
       (.activate (.getNimbus cluster) "test")
 
       (.advanceClusterTime cluster 12)

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj 
b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
index ffbd0cd..db662bc 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
@@ -20,8 +20,9 @@
   (:import [org.apache.storm Testing Testing$Condition])
   (:use [org.apache.storm util config log])
   (:import [java.util ArrayList]
-           (org.apache.storm.daemon.worker WorkerState)))
-
+           (org.apache.storm.daemon.worker WorkerState))
+  (:import [java.util.concurrent.atomic AtomicBoolean] )
+  )
 (def task 1)
 
 ;; In a "real" cluster (or an integration test), Storm itself would ensure 
that a topology's workers would only be
@@ -71,7 +72,7 @@
         resp (atom nil)
         server (.bind context nil 0)
         _ (register-callback (fn [message] (reset! resp message)) server)
-        client (.connect context nil "localhost" (.getPort server))
+        client (.connect context nil "localhost" (.getPort server) (make-array 
AtomicBoolean 2))
         _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))]
     (wait-for-not-nil resp)
@@ -90,6 +91,11 @@
                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-BUFFER-LOW-WATERMARK 8388608
+                    STORM-MESSAGING-NETTY-BUFFER-HIGH-WATERMARK 16777216
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL1-COUNT  1
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL2-COUNT  1000
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL3-SLEEP-MILLIS 
1
                     TOPOLOGY-KRYO-FACTORY 
"org.apache.storm.serialization.DefaultKryoFactory"
                     TOPOLOGY-TUPLE-SERIALIZER 
"org.apache.storm.serialization.types.ListDelegateSerializer"
                     TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false
@@ -108,7 +114,7 @@
         resp (atom nil)
         server (.bind context nil 0)
         _ (register-callback (fn [message] (reset! resp message)) server)
-        client (.connect context nil "localhost" (.getPort server))
+        client (.connect context nil "localhost" (.getPort server) (make-array 
AtomicBoolean 2))
         _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         _ (.sendLoadMetrics server {(int 1) 0.0 (int 2) 1.0})
@@ -132,6 +138,11 @@
                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-BUFFER-LOW-WATERMARK 8388608
+                    STORM-MESSAGING-NETTY-BUFFER-HIGH-WATERMARK 16777216
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL1-COUNT  1
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL2-COUNT  1000
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL3-SLEEP-MILLIS 
1
                     TOPOLOGY-KRYO-FACTORY 
"org.apache.storm.serialization.DefaultKryoFactory"
                     TOPOLOGY-TUPLE-SERIALIZER 
"org.apache.storm.serialization.types.ListDelegateSerializer"
                     TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false
@@ -150,7 +161,7 @@
         resp (atom nil)
         server (.bind context nil 0)
         _ (register-callback (fn [message] (reset! resp message)) server)
-        client (.connect context nil "localhost" (.getPort server))
+        client (.connect context nil "localhost" (.getPort server) (make-array 
AtomicBoolean 2))
         _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))]
     (wait-for-not-nil resp)
@@ -169,6 +180,11 @@
                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-BUFFER-LOW-WATERMARK 8388608
+                    STORM-MESSAGING-NETTY-BUFFER-HIGH-WATERMARK 16777216
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL1-COUNT  1
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL2-COUNT  1000
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL3-SLEEP-MILLIS 
1
                     TOPOLOGY-KRYO-FACTORY 
"org.apache.storm.serialization.DefaultKryoFactory"
                     TOPOLOGY-TUPLE-SERIALIZER 
"org.apache.storm.serialization.types.ListDelegateSerializer"
                     TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false
@@ -186,7 +202,7 @@
         context (TransportFactory/makeContext storm-conf)
         resp (atom nil)
         port (Utils/getAvailablePort (int 6700))
-        client (.connect context nil "localhost" port)
+        client (.connect context nil "localhost" port (make-array 
AtomicBoolean 2))
 
         server (Thread.
                  (fn []
@@ -215,6 +231,11 @@
                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-BUFFER-LOW-WATERMARK 8388608
+                    STORM-MESSAGING-NETTY-BUFFER-HIGH-WATERMARK 16777216
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL1-COUNT  1
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL2-COUNT  1000
+                    TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL3-SLEEP-MILLIS 
1
                     TOPOLOGY-KRYO-FACTORY 
"org.apache.storm.serialization.DefaultKryoFactory"
                     TOPOLOGY-TUPLE-SERIALIZER 
"org.apache.storm.serialization.types.ListDelegateSerializer"
                     TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false
@@ -236,7 +257,7 @@
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil 0)
         _ (register-callback (fn [message] (.add resp message) (swap! received 
inc)) server)
-        client (.connect context nil "localhost" (.getPort server))
+        client (.connect context nil "localhost" (.getPort server) (make-array 
AtomicBoolean 2))
         _ (wait-until-ready [server client])]
     (doseq [num (range 1 num-messages)]
       (let [req_msg (str num)]
@@ -287,6 +308,11 @@
                       STORM-MESSAGING-NETTY-MAX-SLEEP-MS 50
                       STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                       STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                      STORM-MESSAGING-NETTY-BUFFER-LOW-WATERMARK 8388608
+                      STORM-MESSAGING-NETTY-BUFFER-HIGH-WATERMARK 16777216
+                      TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL1-COUNT  1
+                      TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL2-COUNT  1000
+                      
TOPOLOGY-BACKPRESSURE-WAIT-PROGRESSIVE-LEVEL3-SLEEP-MILLIS 1
                       TOPOLOGY-KRYO-FACTORY 
"org.apache.storm.serialization.DefaultKryoFactory"
                       TOPOLOGY-TUPLE-SERIALIZER 
"org.apache.storm.serialization.types.ListDelegateSerializer"
                       TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false
@@ -294,7 +320,7 @@
           resp (atom nil)
           context (TransportFactory/makeContext storm-conf)
           port (Utils/getAvailablePort (int 6700))
-          client (.connect context nil "localhost" port)
+          client (.connect context nil "localhost" port (make-array 
AtomicBoolean 2))
           _ (.send client task (.getBytes req_msg))
           server (.bind context nil port)
           _ (register-callback (fn [message] (reset! resp message)) server)

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 4a3f2a8..37f5c74 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1734,7 +1734,7 @@
       (.submitTopology cluster "t1" {TOPOLOGY-WORKERS 1} topology)
       (.debug nimbus "t1" "" true 100))))
 
-;; if the user sends an empty log config, nimbus will say that all 
+;; if the user sends an empty log config, nimbus will say that all
 ;; log configs it contains are LogLevelAction/UNCHANGED
 (deftest empty-save-config-results-in-all-unchanged-actions
   (let [cluster-state (Mockito/mock IStormClusterState)
@@ -1880,8 +1880,7 @@
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. 
inactive-topos))
         (mocking
           [teardown-heartbeats
-           teardown-topo-errors
-           teardown-backpressure-dirs]
+           teardown-topo-errors]
 
           (.doCleanup nimbus)
 
@@ -1893,10 +1892,6 @@
           (verify-nth-call-args-for 1 teardown-topo-errors "topo2")
           (verify-nth-call-args-for 2 teardown-topo-errors "topo3")
 
-          ;; removed backpressure znodes
-          (verify-nth-call-args-for 1 teardown-backpressure-dirs "topo2")
-          (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3")
-
           ;; removed topo directories
           (.forceDeleteTopoDistDir (Mockito/verify nimbus) "topo2")
           (.forceDeleteTopoDistDir (Mockito/verify nimbus) "topo3")
@@ -1925,14 +1920,12 @@
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set 
inactive-topos))
         (mocking
           [teardown-heartbeats
-           teardown-topo-errors
-           teardown-backpressure-dirs]
+           teardown-topo-errors]
 
           (.doCleanup nimbus)
 
           (verify-call-times-for teardown-heartbeats 0)
           (verify-call-times-for teardown-topo-errors 0)
-          (verify-call-times-for teardown-backpressure-dirs 0)
           (.forceDeleteTopoDistDir (Mockito/verify nimbus (Mockito/times 0)) 
(Mockito/anyObject))
           (.rmTopologyKeys (Mockito/verify nimbus (Mockito/times 0)) 
(Mockito/anyObject))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-server/src/main/java/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java 
b/storm-server/src/main/java/org/apache/storm/Testing.java
index 1b3bd8e..1d3758f 100644
--- a/storm-server/src/main/java/org/apache/storm/Testing.java
+++ b/storm-server/src/main/java/org/apache/storm/Testing.java
@@ -54,10 +54,10 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.RegisteredGlobalState;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.apache.storm.utils.Utils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -518,7 +518,7 @@ public class Testing {
     public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> 
results, String componentId) {
         return readTuples(results, componentId, Utils.DEFAULT_STREAM_ID);
     }
-    
+
     /**
      * Get all of the tuples from a given component on a given stream
      * @param results the results of running a completed topology
@@ -696,7 +696,7 @@ public class Testing {
         Map<String, Fields> streamToFields = new HashMap<>();
         streamToFields.put(stream, new Fields(fields));
         compToStreamToFields.put(component, streamToFields);
-        
+
         TopologyContext context= new TopologyContext(null,
                 ConfigUtils.readStormConfig(),
                 taskToComp,
@@ -714,6 +714,6 @@ public class Testing {
                 new HashMap<>(),
                 new HashMap<>(),
                 new AtomicBoolean(false));
-        return new TupleImpl(context, values, 1, stream);
+        return new TupleImpl(context, values, component, 1, stream);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 4e2d8fc..c924377 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -2151,8 +2151,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 LOG.info("Cleaning up {}", topoId);
                 state.teardownHeartbeats(topoId);
                 state.teardownTopologyErrors(topoId);
-                state.removeBackpressure(topoId);
                 state.removeAllPrivateWorkerKeys(topoId);
+                state.removeBackpressure(topoId);
                 rmDependencyJarsInTopology(topoId);
                 forceDeleteTopoDistDir(topoId);
                 rmTopologyKeys(topoId);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index 16f157d..f21c818 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -93,7 +93,7 @@ public class BasicContainer extends Container {
             _exitedEarly = true;
         }
     }
-    
+
     /**
      * Create a new BasicContainer
      * @param type the type of container being made.
@@ -110,7 +110,7 @@ public class BasicContainer extends Container {
             LocalState localState, String workerId) throws IOException {
         this(type, conf, supervisorId, port, assignment, 
resourceIsolationManager, localState, workerId, null, null, null);
     }
-    
+
     /**
      * Create a new BasicContainer
      * @param type the type of container being made.
@@ -122,7 +122,7 @@ public class BasicContainer extends Container {
      * @param localState the local state of the supervisor.  May be null if 
partial recovery
      * @param workerId the id of the worker to use.  Must not be null if doing 
a partial recovery.
      * @param ops file system operations (mostly for testing) if null a new 
one is made
-     * @param topoConf the config of the topology (mostly for testing) if null 
+     * @param topoConf the config of the topology (mostly for testing) if null
      * and not a partial recovery the real conf is read.
      * @param profileCmd the command to use when profiling (used for testing)
      * @throws IOException on any error
@@ -130,7 +130,7 @@ public class BasicContainer extends Container {
      */
     BasicContainer(ContainerType type, Map<String, Object> conf, String 
supervisorId, int port,
             LocalAssignment assignment, ResourceIsolationInterface 
resourceIsolationManager,
-            LocalState localState, String workerId, Map<String, Object> 
topoConf, 
+            LocalState localState, String workerId, Map<String, Object> 
topoConf,
             AdvancedFSOps ops, String profileCmd) throws IOException {
         super(type, conf, supervisorId, port, assignment, 
resourceIsolationManager, workerId, topoConf, ops);
         assert(localState != null);
@@ -238,7 +238,7 @@ public class BasicContainer extends Container {
 
     /**
      * Run the given command for profiling
-     * 
+     *
      * @param command
      *            the command to run
      * @param env
@@ -368,7 +368,7 @@ public class BasicContainer extends Container {
     protected String getWildcardDir(File dir) {
         return dir.toString() + File.separator + "*";
     }
-    
+
     protected List<String> frameworkClasspath(SimpleVersion topoVersion) {
         File stormWorkerLibDir = new File(_stormHome, "lib-worker");
         String topoConfDir =
@@ -384,10 +384,10 @@ public class BasicContainer extends Container {
         pathElements.add(topoConfDir);
 
         NavigableMap<SimpleVersion, List<String>> classpaths = 
Utils.getConfiguredClasspathVersions(_conf, pathElements);
-        
+
         return Utils.getCompatibleVersion(classpaths, topoVersion, 
"classpath", pathElements);
     }
-    
+
     protected String getWorkerMain(SimpleVersion topoVersion) {
         String defaultWorkerGuess = "org.apache.storm.daemon.worker.Worker";
         if (topoVersion.getMajor() == 0) {
@@ -400,7 +400,7 @@ public class BasicContainer extends Container {
         NavigableMap<SimpleVersion,String> mains = 
Utils.getConfiguredWorkerMainVersions(_conf);
         return Utils.getCompatibleVersion(mains, topoVersion, "worker main 
class", defaultWorkerGuess);
     }
-    
+
     protected String getWorkerLogWriter(SimpleVersion topoVersion) {
         String defaultGuess = "org.apache.storm.LogWriter";
         if (topoVersion.getMajor() == 0) {
@@ -410,7 +410,7 @@ public class BasicContainer extends Container {
         NavigableMap<SimpleVersion,String> mains = 
Utils.getConfiguredWorkerLogWriterVersions(_conf);
         return Utils.getCompatibleVersion(mains, topoVersion, "worker log 
writer class", defaultGuess);
     }
-    
+
     @SuppressWarnings("unchecked")
     private List<String> asStringList(Object o) {
         if (o instanceof String) {
@@ -420,7 +420,7 @@ public class BasicContainer extends Container {
         }
         return Collections.EMPTY_LIST;
     }
-    
+
     /**
      * Compute the classpath for the worker process
      * @param stormJar the topology jar
@@ -454,7 +454,7 @@ public class BasicContainer extends Container {
         }
         return string;
     }
-    
+
     protected List<String> substituteChildopts(Object value) {
         return substituteChildopts(value, -1);
     }
@@ -486,7 +486,7 @@ public class BasicContainer extends Container {
 
     /**
      * Launch the worker process (non-blocking)
-     * 
+     *
      * @param command
      *            the command to run
      * @param env
@@ -519,13 +519,13 @@ public class BasicContainer extends Container {
         } else {
             log4jConfigurationDir = _stormHome + File.separator + "log4j2";
         }
- 
+
         if (ServerUtils.IS_ON_WINDOWS && 
!log4jConfigurationDir.startsWith("file:")) {
             log4jConfigurationDir = "file:///" + log4jConfigurationDir;
         }
         return log4jConfigurationDir + File.separator + "worker.xml";
     }
-    
+
     private static class TopologyMetaData {
         private boolean _dataCached = false;
         private List<String> _depLocs = null;
@@ -534,14 +534,14 @@ public class BasicContainer extends Container {
         private final String _topologyId;
         private final AdvancedFSOps _ops;
         private final String _stormRoot;
-        
+
         public TopologyMetaData(final Map<String, Object> conf, final String 
topologyId, final AdvancedFSOps ops, final String stormRoot) {
             _conf = conf;
             _topologyId = topologyId;
             _ops = ops;
             _stormRoot = stormRoot;
         }
-        
+
         public String toString() {
             List<String> data;
             String stormVersion;
@@ -551,7 +551,7 @@ public class BasicContainer extends Container {
             }
             return "META for " + _topologyId +" DEP_LOCS => " + data + " 
STORM_VERSION => " + stormVersion;
         }
-        
+
         private synchronized void readData() throws IOException {
             final StormTopology stormTopology = 
ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops);
             final List<String> dependencyLocations = new ArrayList<>();
@@ -570,7 +570,7 @@ public class BasicContainer extends Container {
             _stormVersion = stormTopology.get_storm_version();
             _dataCached = true;
         }
-        
+
         public synchronized List<String> getDepLocs() throws IOException {
             if (!_dataCached) {
                 readData();
@@ -588,7 +588,7 @@ public class BasicContainer extends Container {
 
     static class TopoMetaLRUCache {
         public final int _maxSize = 100; //We could make this configurable in 
the future...
-        
+
         @SuppressWarnings("serial")
         private LinkedHashMap<String, TopologyMetaData> _cache = new 
LinkedHashMap<String, TopologyMetaData>() {
             @Override
@@ -596,7 +596,7 @@ public class BasicContainer extends Container {
                 return (size() > _maxSize);
             }
         };
-        
+
         public synchronized TopologyMetaData get(final Map<String, Object> 
conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) {
             //Only go off of the topology id for now.
             TopologyMetaData dl = _cache.get(topologyId);
@@ -606,22 +606,22 @@ public class BasicContainer extends Container {
             }
             return dl;
         }
-        
+
         public synchronized void clear() {
             _cache.clear();
         }
     }
-    
+
     static final TopoMetaLRUCache TOPO_META_CACHE = new TopoMetaLRUCache();
-    
+
     public static List<String> getDependencyLocationsFor(final Map<String, 
Object> conf, final String topologyId, final AdvancedFSOps ops, String 
stormRoot) throws IOException {
         return TOPO_META_CACHE.get(conf, topologyId, ops, 
stormRoot).getDepLocs();
     }
-    
+
     public static String getStormVersionFor(final Map<String, Object> conf, 
final String topologyId, final AdvancedFSOps ops, String stormRoot) throws 
IOException {
         return TOPO_META_CACHE.get(conf, topologyId, ops, 
stormRoot).getStormVersion();
     }
-    
+
     /**
      * Get parameters for the class path of the worker process.  Also used by 
the
      * log Writer
@@ -633,13 +633,13 @@ public class BasicContainer extends Container {
         final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
         final List<String> dependencyLocations = 
getDependencyLocationsFor(_conf, _topologyId, _ops, stormRoot);
         final String workerClassPath = getWorkerClassPath(stormJar, 
dependencyLocations, topoVersion);
-        
+
         List<String> classPathParams = new ArrayList<>();
         classPathParams.add("-cp");
         classPathParams.add(workerClassPath);
         return classPathParams;
     }
-    
+
     /**
      * Get a set of java properties that are common to both the log writer and 
the worker processes.
      * These are mostly system properties that are used by logging.
@@ -649,7 +649,7 @@ public class BasicContainer extends Container {
         final String workersArtifacts = ConfigUtils.workerArtifactsRoot(_conf);
         String stormLogDir = ConfigUtils.getLogDir();
         String log4jConfigurationFile = getWorkerLoggingConfigFile();
-        
+
         List<String> commonParams = new ArrayList<>();
         commonParams.add("-Dlogging.sensitivity=" + OR((String) 
_topoConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY), "S3"));
         commonParams.add("-Dlogfile.name=worker.log");
@@ -667,10 +667,10 @@ public class BasicContainer extends Container {
         }
         return commonParams;
     }
-    
+
     private int getMemOnHeap(WorkerResources resources) {
         int memOnheap = 0;
-        if (resources != null && resources.is_set_mem_on_heap() && 
+        if (resources != null && resources.is_set_mem_on_heap() &&
                 resources.get_mem_on_heap() > 0) {
             memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
         } else {
@@ -679,7 +679,7 @@ public class BasicContainer extends Container {
         }
         return memOnheap;
     }
-    
+
     private List<String> getWorkerProfilerChildOpts(int memOnheap) {
         List<String> workerProfilerChildopts = new ArrayList<>();
         if 
(ObjectReader.getBoolean(_conf.get(DaemonConfig.WORKER_PROFILER_ENABLED), 
false)) {
@@ -687,7 +687,7 @@ public class BasicContainer extends Container {
         }
         return workerProfilerChildopts;
     }
-    
+
     protected String javaCmd(String cmd) {
         String ret = null;
         String javaHome = System.getenv().get("JAVA_HOME");
@@ -698,7 +698,7 @@ public class BasicContainer extends Container {
         }
         return ret;
     }
-    
+
     /**
      * Create the command to launch the worker process
      * @param memOnheap the on heap memory for the worker
@@ -718,10 +718,10 @@ public class BasicContainer extends Container {
             topoVersionString = 
(String)_conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, 
VersionInfo.getVersion());
         }
         final SimpleVersion topoVersion = new SimpleVersion(topoVersionString);
-        
+
         List<String> classPathParams = getClassPathParams(stormRoot, 
topoVersion);
         List<String> commonParams = getCommonParams();
-        
+
         List<String> commandList = new ArrayList<>();
         String logWriter = getWorkerLogWriter(topoVersion);
         if (logWriter != null) {

Reply via email to