http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 1d07497..0901cc3 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -24,10 +24,10 @@ import org.apache.storm.messaging.IConnectionCallback;
 import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.serialization.KryoValuesSerializer;
 import org.apache.storm.utils.ObjectReader;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
@@ -39,6 +39,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -62,9 +64,10 @@ class Server extends ConnectionWithStatus implements 
IStatefulObject, ISaslServe
     final ServerBootstrap bootstrap;
  
     private volatile boolean closing = false;
-    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
-    private KryoValuesSerializer _ser;
-    private IConnectionCallback _cb = null; 
+    KryoValuesSerializer _ser;
+    KryoValuesDeserializer deser;
+    private IConnectionCallback _cb = null;
+    private Supplier<Object> newConnectionResponse;
     private final int boundPort;
     
     @SuppressWarnings("rawtypes")
@@ -72,6 +75,7 @@ class Server extends ConnectionWithStatus implements 
IStatefulObject, ISaslServe
         this.topoConf = topoConf;
         this.port = port;
         _ser = new KryoValuesSerializer(topoConf);
+        deser = new KryoValuesDeserializer(topoConf);
 
         // Configure the server.
         int buffer_size = 
ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -143,12 +147,9 @@ class Server extends ConnectionWithStatus implements 
IStatefulObject, ISaslServe
         _cb = cb;
     }
 
-    /**
-     * register a newly created channel
-     * @param channel newly created channel
-     */
-    protected void addChannel(Channel channel) {
-        allChannels.add(channel);
+    @Override
+    public void registerNewConnectionResponse(Supplier<Object> 
newConnectionResponse) {
+        this.newConnectionResponse = newConnectionResponse;
     }
 
     /**
@@ -176,14 +177,17 @@ class Server extends ConnectionWithStatus implements 
IStatefulObject, ISaslServe
     }
 
     @Override
-    public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
-        try {
-            MessageBatch mb = new MessageBatch(1);
-            mb.add(new TaskMessage(-1, 
_ser.serialize(Arrays.asList((Object)taskToLoad))));
-            allChannels.write(mb);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+    synchronized public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+        MessageBatch mb = new MessageBatch(1);
+        mb.add(new TaskMessage(-1, 
_ser.serialize(Arrays.asList((Object)taskToLoad))));
+        allChannels.write(mb);
+    }
+
+    // this method expected to be thread safe
+    @Override
+    synchronized public void sendBackPressureStatus(BackPressureStatus 
bpStatus)  {
+        LOG.info("Sending BackPressure status update to connected workers. 
BPStatus = {}", bpStatus);
+        allChannels.write(bpStatus);
     }
 
     @Override
@@ -264,7 +268,10 @@ class Server extends ConnectionWithStatus implements 
IStatefulObject, ISaslServe
 
     /** Implementing IServer. **/
     public void channelConnected(Channel c) {
-        addChannel(c);
+        if (newConnectionResponse != null) {
+            c.write( newConnectionResponse.get() ); // not synchronized since 
it is not yet in channel grp, so pvt to this thread
+        }
+        allChannels.add(c);
     }
 
     public void received(Object message, String remote, Channel channel)  
throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index 3487003..a10fd02 100644
--- 
a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -23,16 +23,14 @@ import 
org.apache.storm.serialization.KryoValuesDeserializer;
 import java.net.ConnectException;
 import java.util.Map;
 import java.util.List;
-import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
-import org.jboss.netty.channel.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,9 +38,11 @@ public class StormClientHandler extends 
SimpleChannelUpstreamHandler  {
     private static final Logger LOG = 
LoggerFactory.getLogger(StormClientHandler.class);
     private Client client;
     private KryoValuesDeserializer _des;
+    private AtomicBoolean[] remoteBpStatus;
 
-    StormClientHandler(Client client, Map<String, Object> conf) {
+    StormClientHandler(Client client, AtomicBoolean[] remoteBpStatus, 
Map<String, Object> conf) {
         this.client = client;
+        this.remoteBpStatus = remoteBpStatus;
         _des = new KryoValuesDeserializer(conf);
     }
 
@@ -55,20 +55,29 @@ public class StormClientHandler extends 
SimpleChannelUpstreamHandler  {
             if (msg==ControlMessage.FAILURE_RESPONSE) {
                 LOG.info("failure response:{}", msg);
             }
-        } else if (message instanceof List) {
-            try {
-                //This should be the metrics, and there should only be one of 
them
-                List<TaskMessage> list = (List<TaskMessage>)message;
-                if (list.size() < 1) throw new RuntimeException("Didn't see 
enough load metrics ("+client.getDstAddress()+") "+list);
-                TaskMessage tm = ((List<TaskMessage>)message).get(list.size() 
- 1);
-                if (tm.task() != -1) throw new RuntimeException("Metrics 
messages are sent to the system task ("+client.getDstAddress()+") "+tm);
-                List metrics = _des.deserialize(tm.message());
-                if (metrics.size() < 1) throw new RuntimeException("No metrics 
data in the metrics message ("+client.getDstAddress()+") "+metrics);
-                if (!(metrics.get(0) instanceof Map)) throw new 
RuntimeException("The metrics did not have a map in the first slot 
("+client.getDstAddress()+") "+metrics);
-                client.setLoadMetrics((Map<Integer, Double>)metrics.get(0));
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+        } else if (message instanceof BackPressureStatus) {
+            BackPressureStatus status = (BackPressureStatus) message;
+            if (status.bpTasks != null) {
+                for (Integer bpTask : status.bpTasks) {
+                    remoteBpStatus[bpTask].set(true);
+                }
+            }
+            if (status.nonBpTasks != null) {
+                for (Integer bpTask : status.nonBpTasks) {
+                    remoteBpStatus[bpTask].set(false);
+                }
             }
+            LOG.debug("Received BackPressure status update : {}", status);
+        } else if (message instanceof List) {
+            //This should be the metrics, and there should only be one of them
+            List<TaskMessage> list = (List<TaskMessage>)message;
+            if (list.size() < 1) throw new RuntimeException("Didn't see enough 
load metrics ("+client.getDstAddress()+") "+list);
+            TaskMessage tm = ((List<TaskMessage>)message).get(list.size() - 1);
+            if (tm.task() != -1) throw new RuntimeException("Metrics messages 
are sent to the system task ("+client.getDstAddress()+") "+tm);
+            List metrics = _des.deserialize(tm.message());
+            if (metrics.size() < 1) throw new RuntimeException("No metrics 
data in the metrics message ("+client.getDstAddress()+") "+metrics);
+            if (!(metrics.get(0) instanceof Map)) throw new 
RuntimeException("The metrics did not have a map in the first slot 
("+client.getDstAddress()+") "+metrics);
+            client.setLoadMetrics((Map<Integer, Double>)metrics.get(0));
         } else {
             throw new RuntimeException("Don't know how to handle a message of 
type "
                                        + message + " (" + 
client.getDstAddress() + ")");

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
index 3fd7641..a16f3f5 100644
--- 
a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
+++ 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
@@ -23,13 +23,16 @@ import org.jboss.netty.channel.Channels;
 
 import org.apache.storm.Config;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 class StormClientPipelineFactory implements ChannelPipelineFactory {
     private Client client;
+    private AtomicBoolean[] remoteBpStatus;
     private Map<String, Object> conf;
 
-    StormClientPipelineFactory(Client client, Map<String, Object> conf) {
+    StormClientPipelineFactory(Client client, AtomicBoolean[] remoteBpStatus, 
Map<String, Object> conf) {
         this.client = client;
+        this.remoteBpStatus = remoteBpStatus;
         this.conf = conf;
     }
 
@@ -38,9 +41,9 @@ class StormClientPipelineFactory implements 
ChannelPipelineFactory {
         ChannelPipeline pipeline = Channels.pipeline();
 
         // Decoder
-        pipeline.addLast("decoder", new MessageDecoder());
+        pipeline.addLast("decoder", new MessageDecoder(client.deser));
         // Encoder
-        pipeline.addLast("encoder", new MessageEncoder());
+        pipeline.addLast("encoder", new MessageEncoder(client.ser));
 
         boolean isNettyAuth = (Boolean) conf
                 .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
@@ -50,7 +53,7 @@ class StormClientPipelineFactory implements 
ChannelPipelineFactory {
                     client));
         }
         // business logic.
-        pipeline.addLast("handler", new StormClientHandler(client, conf));
+        pipeline.addLast("handler", new StormClientHandler(client, 
remoteBpStatus, conf));
         return pipeline;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
index 2e4f418..b67e264 100644
--- 
a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
+++ 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.messaging.netty;
 
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
@@ -35,9 +37,9 @@ class StormServerPipelineFactory implements 
ChannelPipelineFactory {
         ChannelPipeline pipeline = Channels.pipeline();
 
         // Decoder
-        pipeline.addLast("decoder", new MessageDecoder());
+        pipeline.addLast("decoder", new MessageDecoder(server.deser));
         // Encoder
-        pipeline.addLast("encoder", new MessageEncoder());
+        pipeline.addLast("encoder", new MessageEncoder(server._ser));
 
         boolean isNettyAuth = (Boolean) this.server.topoConf
                 .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java 
b/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
new file mode 100644
index 0000000..f41492e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.policy;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.ReflectionUtils;
+
+import java.util.Map;
+
+
+public interface IWaitStrategy {
+    enum WAIT_SITUATION {BOLT_WAIT, BACK_PRESSURE_WAIT}
+
+    void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation);
+
+    /**
+     * Implementations of this method should be thread-safe (preferably no 
side-effects and lock-free)
+     * <p>
+     * Supports static or dynamic backoff. Dynamic backoff relies on 
idleCounter to
+     * estimate how long caller has been idling.
+     * <p>
+     * <pre>
+     * <code>
+     *  int idleCounter = 0;
+     *  int consumeCount = consumeFromQ();
+     *  while (consumeCount==0) {
+     *     idleCounter = strategy.idle(idleCounter);
+     *     consumeCount = consumeFromQ();
+     *  }
+     * </code>
+     * </pre>
+     *
+     * @param idleCounter managed by the idle method until reset
+     * @return new counter value to be used on subsequent idle cycle
+     */
+    int idle(int idleCounter) throws InterruptedException;
+
+    static IWaitStrategy createBackPressureWaitStrategy(Map<String, Object> 
topologyConf) {
+        IWaitStrategy producerWaitStrategy = 
ReflectionUtils.newInstance((String) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
+        producerWaitStrategy.prepare(topologyConf, 
WAIT_SITUATION.BACK_PRESSURE_WAIT);
+        return producerWaitStrategy;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java 
b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
new file mode 100644
index 0000000..0406fd2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.policy;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.ObjectReader;
+
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
+public class WaitStrategyPark implements IWaitStrategy {
+    private long parkTimeNanoSec;
+
+    @Override
+    public void prepare(Map<String, Object> conf, WAIT_SITUATION 
waitSituation) {
+        if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
+            parkTimeNanoSec = 1_000 * 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC));
+        } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
+            parkTimeNanoSec = 1_000 * 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC));
+        } else {
+            throw new IllegalArgumentException("Unknown wait situation : " + 
waitSituation);
+        }
+    }
+
+    public WaitStrategyPark() { // required for instantiation via reflection. 
must call prepare() thereafter
+    }
+
+    // Convenience alternative to prepare() for use in Tests
+    public WaitStrategyPark(long microsec) {
+        parkTimeNanoSec = microsec * 1_000;
+    }
+
+
+    @Override
+    public int idle(int idleCounter) throws InterruptedException {
+        if (parkTimeNanoSec == 0) {
+            return 1;
+        }
+        LockSupport.parkNanos(parkTimeNanoSec);
+        return idleCounter + 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java 
b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
new file mode 100644
index 0000000..067ca71
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.policy;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.ObjectReader;
+
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A Progressive Wait Strategy
+ * <p> Has three levels of idling. Stays in each level for a configured number 
of iterations before entering the next level.
+ * Level 1 - No idling. Returns immediately. Stays in this level for 
`level1Count` iterations.
+ * Level 2 - Calls LockSupport.parkNanos(1). Stays in this level for 
`level2Count` iterations
+ * Level 3 - Calls Thread.sleep(). Stays in this level until wait situation 
changes.
+ *
+ * <p>
+ * The initial spin can be useful to prevent downstream bolt from repeatedly 
sleeping/parking when
+ * the upstream component is a bit relatively slower. Allows downstream bolt 
can enter deeper wait states only
+ * if the traffic to it appears to have reduced.
+ * <p>
+ */
+public class WaitStrategyProgressive implements IWaitStrategy {
+    private int  level1Count;
+    private int  level2Count;
+    private long level3SleepMs;
+
+    @Override
+    public void prepare(Map<String, Object> conf, WAIT_SITUATION 
waitSituation) {
+        if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
+            level1Count   = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
+            level2Count   = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
+            level3SleepMs = 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
+        } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
+            level1Count   = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT));
+            level2Count   = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT));
+            level3SleepMs = 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
+        } else {
+            throw new IllegalArgumentException("Unknown wait situation : " + 
waitSituation);
+        }
+    }
+
+    @Override
+    public int idle(int idleCounter) throws InterruptedException {
+        if (idleCounter < level1Count) {                     // level 1 - no 
waiting
+            ++idleCounter;
+        } else if (idleCounter < level1Count * level2Count) { // level 2 - 
parkNanos(1L)
+            ++idleCounter;
+            LockSupport.parkNanos(1L);
+        } else {                                      // level 3 - longer 
idling with Thread.sleep()
+            Thread.sleep(level3SleepMs);
+        }
+        return idleCounter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java
 
b/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java
index f2d8bf1..1be7558 100644
--- 
a/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java
+++ 
b/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java
@@ -19,7 +19,6 @@ package org.apache.storm.serialization;
 
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.MessageId;
-import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import com.esotericsoftware.kryo.io.Input;
 import java.io.IOException;
@@ -39,7 +38,8 @@ public class KryoTupleDeserializer implements 
ITupleDeserializer {
         _kryoInput = new Input(1);
     }        
 
-    public Tuple deserialize(byte[] ser) {
+    @Override
+    public TupleImpl deserialize(byte[] ser) {
         try {
             _kryoInput.setBuffer(ser);
             int taskId = _kryoInput.readInt(true);
@@ -48,7 +48,7 @@ public class KryoTupleDeserializer implements 
ITupleDeserializer {
             String streamName = _ids.getStreamName(componentName, streamId);
             MessageId id = MessageId.deserialize(_kryoInput);
             List<Object> values = _kryo.deserializeFrom(_kryoInput);
-            return new TupleImpl(_context, values, taskId, streamName, id);
+            return new TupleImpl(_context, values, componentName, taskId, 
streamName, id);
         } catch(IOException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java
 
b/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java
index 2ef326a..2d83584 100644
--- 
a/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java
+++ 
b/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java
@@ -38,12 +38,12 @@ public class KryoValuesDeserializer {
        return delegate.getDelegate();
     }
     
-    public List<Object> deserialize(byte[] ser) throws IOException {
+    public List<Object> deserialize(byte[] ser) {
         _kryoInput.setBuffer(ser);
         return deserializeFrom(_kryoInput);
     }
     
-    public Object deserializeObject(byte[] ser) throws IOException {
+    public Object deserializeObject(byte[] ser)  {
         _kryoInput.setBuffer(ser);
         return _kryo.readClassAndObject(_kryoInput);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java 
b/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java
index af41e15..8c5c5c1 100644
--- 
a/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java
+++ 
b/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java
@@ -20,7 +20,6 @@ package org.apache.storm.serialization;
 import org.apache.storm.utils.ListDelegate;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Output;
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -35,7 +34,7 @@ public class KryoValuesSerializer {
         _kryoOut = new Output(2000, 2000000000);
     }
     
-    public void serializeInto(List<Object> values, Output out) throws 
IOException {
+    public void serializeInto(List<Object> values, Output out) {
         // this ensures that list of values is always written the same way, 
regardless
         // of whether it's a java collection or one of clojure's persistent 
collections 
         // (which have different serializers)
@@ -44,7 +43,7 @@ public class KryoValuesSerializer {
         _kryo.writeObject(out, _delegate); 
     }
     
-    public byte[] serialize(List<Object> values) throws IOException {
+    public byte[] serialize(List<Object> values) {
         _kryoOut.clear();
         serializeInto(values, _kryoOut);
         return _kryoOut.toBytes();

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java 
b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
index d6c54a3..8f69b17 100644
--- 
a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
+++ 
b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
@@ -20,6 +20,7 @@ package org.apache.storm.serialization;
 import org.apache.storm.Config;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.apache.storm.serialization.types.ArrayListSerializer;
 import org.apache.storm.serialization.types.HashMapSerializer;
 import org.apache.storm.serialization.types.HashSetSerializer;
@@ -72,6 +73,7 @@ public class SerializationFactory {
         
k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class);
         
k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class);
         k.register(ConsList.class);
+        k.register(BackPressureStatus.class);
         
         synchronized (loader) {
             for (SerializationRegister sr: loader) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java 
b/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java
index 4624cf7..f8a5a6c 100644
--- a/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java
+++ b/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java
@@ -21,6 +21,11 @@ import org.apache.storm.task.IErrorReporter;
 
 import java.util.List;
 
+/**
+ *   Methods are not expected to be thread safe. Each thread expected to have 
a separate instance of this type of object, or else
+ *   externally synchronize any shared instance.
+ */
+
 public interface ISpoutOutputCollector extends IErrorReporter{
     /**
         Returns the task ids that received the tuples.
@@ -28,5 +33,6 @@ public interface ISpoutOutputCollector extends IErrorReporter{
     List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
     void emitDirect(int taskId, String streamId, List<Object> tuple, Object 
messageId);
     long getPendingCount();
+    void flush();
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java 
b/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java
index 8a7beb6..7666d1c 100644
--- a/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java
+++ b/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -63,7 +63,7 @@ public class SpoutOutputCollector implements 
ISpoutOutputCollector {
      * @return the list of task ids that this tuple was sent to
      */
     public List<Integer> emit(List<Object> tuple, Object messageId) {
-        return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
+        return _delegate.emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
     }
 
     /**
@@ -133,6 +133,11 @@ public class SpoutOutputCollector implements 
ISpoutOutputCollector {
     }
 
     @Override
+    public void flush() {
+        _delegate.flush();
+    }
+
+    @Override
     public void reportError(Throwable error) {
         _delegate.reportError(error);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java 
b/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
index bb61921..5d6ef17 100644
--- a/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
+++ b/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
@@ -125,7 +125,7 @@ public class DefaultStateSerializer<T> implements 
Serializer<T> {
         public TupleImpl read(Kryo kryo, Input input, Class<TupleImpl> type) {
             int length = input.readInt();
             byte[] bytes = input.readBytes(length);
-            return (TupleImpl) tupleDeserializer.deserialize(bytes);
+            return tupleDeserializer.deserialize(bytes);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java 
b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 1bacab2..78246bb 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -30,40 +30,49 @@ import java.util.List;
 @SuppressWarnings("unchecked")
 public class BoltExecutorStats extends CommonStats {
 
-    public static final String ACKED = "acked";
-    public static final String FAILED = "failed";
-    public static final String EXECUTED = "executed";
-    public static final String PROCESS_LATENCIES = "process-latencies";
-    public static final String EXECUTE_LATENCIES = "execute-latencies";
+    MultiCountStatAndMetric   ackedStats;
+    MultiCountStatAndMetric   failedStats;
+    MultiCountStatAndMetric   executedStats;
+    MultiLatencyStatAndMetric processLatencyStats;
+    MultiLatencyStatAndMetric executeLatencyStats;
 
     public BoltExecutorStats(int rate,int numStatBuckets) {
         super(rate,numStatBuckets);
-
-        this.put(ACKED, new MultiCountStatAndMetric(numStatBuckets));
-        this.put(FAILED, new MultiCountStatAndMetric(numStatBuckets));
-        this.put(EXECUTED, new MultiCountStatAndMetric(numStatBuckets));
-        this.put(PROCESS_LATENCIES, new 
MultiLatencyStatAndMetric(numStatBuckets));
-        this.put(EXECUTE_LATENCIES, new 
MultiLatencyStatAndMetric(numStatBuckets));
+        this.ackedStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.failedStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.executedStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.processLatencyStats = new 
MultiLatencyStatAndMetric(numStatBuckets);
+        this.executeLatencyStats = new 
MultiLatencyStatAndMetric(numStatBuckets);
     }
 
     public MultiCountStatAndMetric getAcked() {
-        return (MultiCountStatAndMetric) this.get(ACKED);
+        return ackedStats;
     }
 
     public MultiCountStatAndMetric getFailed() {
-        return (MultiCountStatAndMetric) this.get(FAILED);
+        return failedStats;
     }
 
     public MultiCountStatAndMetric getExecuted() {
-        return (MultiCountStatAndMetric) this.get(EXECUTED);
+        return executedStats;
     }
 
     public MultiLatencyStatAndMetric getProcessLatencies() {
-        return (MultiLatencyStatAndMetric) this.get(PROCESS_LATENCIES);
+        return processLatencyStats;
     }
 
     public MultiLatencyStatAndMetric getExecuteLatencies() {
-        return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES);
+        return executeLatencyStats;
+    }
+
+    @Override
+    public void cleanupStats() {
+        ackedStats.close();
+        failedStats.close();
+        executedStats.close();
+        processLatencyStats.close();
+        executeLatencyStats.close();
+        super.cleanupStats();
     }
 
     public void boltExecuteTuple(String component, String stream, long 
latencyMs) {
@@ -88,17 +97,17 @@ public class BoltExecutorStats extends CommonStats {
     public ExecutorStats renderStats() {
         ExecutorStats ret = new ExecutorStats();
         // common stats
-        ret.set_emitted(valueStat(EMITTED));
-        ret.set_transferred(valueStat(TRANSFERRED));
+        ret.set_emitted(valueStat(getEmitted()));
+        ret.set_transferred(valueStat(getTransferred()));
         ret.set_rate(this.rate);
 
         // bolt stats
         BoltStats boltStats = new BoltStats(
-                StatsUtil.windowSetConverter(valueStat(ACKED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-                StatsUtil.windowSetConverter(valueStat(FAILED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-                StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-                StatsUtil.windowSetConverter(valueStat(EXECUTED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-                StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY));
+                StatsUtil.windowSetConverter(valueStat(ackedStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(failedStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(processLatencyStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(executedStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(executeLatencyStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY));
         ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
 
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java 
b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
index a66a92c..f115d12 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
@@ -17,29 +17,23 @@
  */
 package org.apache.storm.stats;
 
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.storm.generated.ExecutorStats;
-import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
 @SuppressWarnings("unchecked")
 public abstract class CommonStats {
 
-    public static final String RATE = "rate";
-
-    public static final String EMITTED = "emitted";
-    public static final String TRANSFERRED = "transferred";
-    public static final String[] COMMON_FIELDS = {EMITTED, TRANSFERRED};
+    private final MultiCountStatAndMetric emittedStats;
+    private final MultiCountStatAndMetric transferredStats;
 
     protected final int rate;
-    protected final Map<String, IMetric> metricMap = new HashMap<>();
 
     public CommonStats(int rate,int numStatBuckets) {
         this.rate = rate;
-        this.put(EMITTED, new MultiCountStatAndMetric(numStatBuckets));
-        this.put(TRANSFERRED, new MultiCountStatAndMetric(numStatBuckets));
+        this.emittedStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.transferredStats = new MultiCountStatAndMetric(numStatBuckets);
     }
 
     public int getRate() {
@@ -47,19 +41,11 @@ public abstract class CommonStats {
     }
 
     public MultiCountStatAndMetric getEmitted() {
-        return (MultiCountStatAndMetric) get(EMITTED);
+        return emittedStats;
     }
 
     public MultiCountStatAndMetric getTransferred() {
-        return (MultiCountStatAndMetric) get(TRANSFERRED);
-    }
-
-    public IMetric get(String field) {
-        return (IMetric) StatsUtil.getByKey(metricMap, field);
-    }
-
-    protected void put(String field, Object value) {
-        StatsUtil.putKV(metricMap, field, value);
+        return transferredStats;
     }
 
     public void emittedTuple(String stream) {
@@ -71,42 +57,16 @@ public abstract class CommonStats {
     }
 
     public void cleanupStats() {
-        for (Object imetric : this.metricMap.values()) {
-            cleanupStat((IMetric) imetric);
-        }
-    }
-
-    private void cleanupStat(IMetric metric) {
-        if (metric instanceof MultiCountStatAndMetric) {
-            ((MultiCountStatAndMetric) metric).close();
-        } else if (metric instanceof MultiLatencyStatAndMetric) {
-            ((MultiLatencyStatAndMetric) metric).close();
-        }
+        emittedStats.close();
+        transferredStats.close();
     }
 
-    protected Map valueStats(String[] fields) {
-        Map ret = new HashMap();
-        for (String field : fields) {
-            IMetric metric = this.get(field);
-            if (metric instanceof MultiCountStatAndMetric) {
-                StatsUtil.putKV(ret, field, ((MultiCountStatAndMetric) 
metric).getTimeCounts());
-            } else if (metric instanceof MultiLatencyStatAndMetric) {
-                StatsUtil.putKV(ret, field, ((MultiLatencyStatAndMetric) 
metric).getTimeLatAvg());
-            }
-        }
-        StatsUtil.putKV(ret, CommonStats.RATE, this.getRate());
-
-        return ret;
+    protected Map valueStat(MultiCountStatAndMetric metric) {
+        return metric.getTimeCounts();
     }
 
-    protected Map valueStat(String field) {
-        IMetric metric = this.get(field);
-        if (metric instanceof MultiCountStatAndMetric) {
-            return ((MultiCountStatAndMetric) metric).getTimeCounts();
-        } else if (metric instanceof MultiLatencyStatAndMetric) {
-            return ((MultiLatencyStatAndMetric) metric).getTimeLatAvg();
-        }
-        return null;
+    protected Map valueStat(MultiLatencyStatAndMetric metric) {
+        return metric.getTimeLatAvg();
     }
 
     public abstract ExecutorStats renderStats();

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java 
b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 0addbdc..6c3d589 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -26,27 +26,35 @@ import 
org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 @SuppressWarnings("unchecked")
 public class SpoutExecutorStats extends CommonStats {
 
-    public static final String ACKED = "acked";
-    public static final String FAILED = "failed";
-    public static final String COMPLETE_LATENCIES = "complete-latencies";
+    private final MultiCountStatAndMetric ackedStats;
+    private final MultiCountStatAndMetric failedStats;
+    private final MultiLatencyStatAndMetric completeLatencyStats;
 
     public SpoutExecutorStats(int rate,int numStatBuckets) {
         super(rate,numStatBuckets);
-        this.put(ACKED, new MultiCountStatAndMetric(numStatBuckets));
-        this.put(FAILED, new MultiCountStatAndMetric(numStatBuckets));
-        this.put(COMPLETE_LATENCIES, new 
MultiLatencyStatAndMetric(numStatBuckets));
+        this.ackedStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.failedStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.completeLatencyStats = new 
MultiLatencyStatAndMetric(numStatBuckets);
     }
 
     public MultiCountStatAndMetric getAcked() {
-        return (MultiCountStatAndMetric) this.get(ACKED);
+        return ackedStats;
     }
 
     public MultiCountStatAndMetric getFailed() {
-        return (MultiCountStatAndMetric) this.get(FAILED);
+        return failedStats;
     }
 
     public MultiLatencyStatAndMetric getCompleteLatencies() {
-        return (MultiLatencyStatAndMetric) this.get(COMPLETE_LATENCIES);
+        return completeLatencyStats;
+    }
+
+    @Override
+    public void cleanupStats() {
+        ackedStats.close();
+        failedStats.close();
+        completeLatencyStats.close();
+        super.cleanupStats();
     }
 
     public void spoutAckedTuple(String stream, long latencyMs) {
@@ -62,13 +70,13 @@ public class SpoutExecutorStats extends CommonStats {
     public ExecutorStats renderStats() {
         ExecutorStats ret = new ExecutorStats();
         // common fields
-        ret.set_emitted(valueStat(EMITTED));
-        ret.set_transferred(valueStat(TRANSFERRED));
+        ret.set_emitted(valueStat(getEmitted()));
+        ret.set_transferred(valueStat(getTransferred()));
         ret.set_rate(this.rate);
 
         // spout stats
         SpoutStats spoutStats = new SpoutStats(
-                valueStat(ACKED), valueStat(FAILED), 
valueStat(COMPLETE_LATENCIES));
+                valueStat(ackedStats), valueStat(failedStats), 
valueStat(completeLatencyStats));
         ret.set_specific(ExecutorSpecificStats.spout(spoutStats));
 
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java 
b/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
index deae7cf..d59c9cf 100644
--- a/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
@@ -24,6 +24,7 @@ import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ThriftTopologyUtils;
 import java.util.ArrayList;
@@ -41,8 +42,9 @@ public class GeneralTopologyContext implements JSONAware {
     private Map<String, List<Integer>> _componentToTasks;
     private Map<String, Map<String, Fields>> _componentToStreamToFields;
     private String _stormId;
-    protected Map _topoConf;
-    
+    protected Map<String, Object> _topoConf;
+    protected boolean _doSanityCheck;
+
     // pass in componentToSortedTasks for the case of running tons of tasks in 
single executor
     public GeneralTopologyContext(StormTopology topology, Map<String, Object> 
topoConf,
             Map<Integer, String> taskToComponent, Map<String, List<Integer>> 
componentToSortedTasks,
@@ -53,6 +55,7 @@ public class GeneralTopologyContext implements JSONAware {
         _stormId = stormId;
         _componentToTasks = componentToSortedTasks;
         _componentToStreamToFields = componentToStreamToFields;
+        _doSanityCheck = ConfigUtils.isLocalMode(_topoConf);
     }
 
     /**
@@ -203,4 +206,8 @@ public class GeneralTopologyContext implements JSONAware {
     public Map<String, Object> getConf() {
         return _topoConf;
     }
+
+    public boolean doSanityCheck() {
+        return _doSanityCheck;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java 
b/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java
index cda4d9f..f740060 100644
--- a/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java
+++ b/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java
@@ -18,6 +18,7 @@
 package org.apache.storm.task;
 
 import org.apache.storm.tuple.Tuple;
+
 import java.util.Collection;
 import java.util.List;
 
@@ -30,4 +31,5 @@ public interface IOutputCollector extends IErrorReporter {
     void ack(Tuple input);
     void fail(Tuple input);
     void resetTimeout(Tuple input);
+    void flush();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java 
b/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java
index 24f58ac..7ce9ee0 100644
--- a/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -233,4 +233,9 @@ public class OutputCollector implements IOutputCollector {
     public void reportError(Throwable error) {
         _delegate.reportError(error);
     }
+
+    @Override
+    public void flush() {
+        _delegate.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java 
b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
index 601d70f..0a1765b 100644
--- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,7 +32,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -272,7 +271,7 @@ public class TopologyContext extends WorkerTopologyContext 
implements IMetricsCo
         _hooks.add(hook);
     }
 
-    public Collection<ITaskHook> getHooks() {
+    public List<ITaskHook> getHooks() {
         return _hooks;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java 
b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
index 33d62ec..5f75eae 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
@@ -62,6 +62,11 @@ public class SpoutTracker extends BaseRichSpout {
         }
 
         @Override
+        public void flush() {
+            _collector.flush();
+        }
+
+        @Override
         public void reportError(Throwable error) {
                _collector.reportError(error);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java 
b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
index 7392c9c..e8899ed 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
@@ -42,6 +42,11 @@ public interface TridentCollector {
     void emit(List<Object> values);
 
     /**
+     * Flush any buffered tuples (when batching is enabled)
+     */
+    void flush();
+
+    /**
      * Reports an error. The corresponding stack trace will be visible in the 
Storm UI.
      *
      * Note that calling this method does not alter the processing of a batch. 
To explicitly fail a batch and trigger

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java
 
b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java
index c187857..df4561a 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java
@@ -36,6 +36,11 @@ public class CaptureCollector implements TridentCollector {
     }
 
     @Override
+    public void flush() {
+        _coll.flush();
+    }
+
+    @Override
     public void reportError(Throwable t) {
         _coll.reportError(t);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java
 
b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java
index 0f0d014..e70cc28 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java
@@ -41,6 +41,11 @@ public class GroupCollector implements TridentCollector {
     }
 
     @Override
+    public void flush() {
+        _collector.flush();
+    }
+
+    @Override
     public void reportError(Throwable t) {
         _collector.reportError(t);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java 
b/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java
index 9626f98..6ccea23 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java
@@ -34,5 +34,9 @@ public class BridgeReceiver implements TupleReceiver {
     public void execute(ProcessorContext context, String streamId, 
TridentTuple tuple) {
         _collector.emit(streamId, new ConsList(context.batchId, tuple));
     }
-    
+
+    @Override
+    public void flush() {
+        _collector.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java 
b/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java
index 4b96465..1c5a1d6 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java
@@ -23,5 +23,5 @@ import org.apache.storm.trident.tuple.TridentTuple;
 public interface TupleReceiver {
     //streaId indicates where tuple came from
     void execute(ProcessorContext processorContext, String streamId, 
TridentTuple tuple);
-    
+    void flush();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java
index b849418..90efd5a 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java
@@ -70,7 +70,12 @@ public class AggregateProcessor implements TridentProcessor {
         _collector.setContext(processorContext);
         _agg.aggregate(processorContext.state[_context.getStateIndex()], 
_projection.create(tuple), _collector);
     }
-    
+
+    @Override
+    public void flush() {
+        _collector.flush();
+    }
+
     @Override
     public void finishBatch(ProcessorContext processorContext) {
         _collector.setContext(processorContext);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java
index a4ac745..36664b0 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java
@@ -52,6 +52,13 @@ public class AppendCollector implements TridentCollector {
     }
 
     @Override
+    public void flush() {
+        for(TupleReceiver r: _triContext.getReceivers()) {
+            r.flush();
+        }
+    }
+
+    @Override
     public void reportError(Throwable t) {
         _triContext.getDelegateCollector().reportError(t);
     } 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java
index c9b4508..2cadd22 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java
@@ -55,6 +55,11 @@ public class EachProcessor implements TridentProcessor {
     }
 
     @Override
+    public void flush() {
+        _collector.flush();
+    }
+
+    @Override
     public void cleanup() {
         _function.cleanup();
     }    
@@ -65,6 +70,7 @@ public class EachProcessor implements TridentProcessor {
         _function.execute(_projection.create(tuple), _collector);
     }
 
+
     @Override
     public void startBatch(ProcessorContext processorContext) {
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java
index 999b54a..6a57943 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java
@@ -49,6 +49,13 @@ public class FreshCollector implements TridentCollector {
     }
 
     @Override
+    public void flush() {
+        for(TupleReceiver r: _triContext.getReceivers()) {
+            r.flush();
+        }
+    }
+
+    @Override
     public void reportError(Throwable t) {
         _triContext.getDelegateCollector().reportError(t);
     } 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
index 110cdaf..9e7d603 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
@@ -24,6 +24,7 @@ import org.apache.storm.trident.operation.MapFunction;
 import org.apache.storm.trident.operation.TridentOperationContext;
 import org.apache.storm.trident.planner.ProcessorContext;
 import org.apache.storm.trident.planner.TridentProcessor;
+import org.apache.storm.trident.planner.TupleReceiver;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.tuple.TridentTupleView;
 import org.apache.storm.tuple.Fields;
@@ -71,6 +72,11 @@ public class MapProcessor implements TridentProcessor {
     }
 
     @Override
+    public void flush() {
+        _collector.flush();
+    }
+
+    @Override
     public void startBatch(ProcessorContext processorContext) {
         // NOOP
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java
index 77dead1..1b7feb4 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java
@@ -79,7 +79,11 @@ public class MultiReducerProcessor implements 
TridentProcessor {
         int i = _streamToIndex.get(streamId);
         _reducer.execute(processorContext.state[_context.getStateIndex()], i, 
_projectionFactories[i].create(tuple), _collector);
     }
-    
+
+    @Override
+    public void flush() {
+        _collector.flush();
+    }
     @Override
     public void finishBatch(ProcessorContext processorContext) {
         _collector.setContext(processorContext);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java
index af4c8f6..e00c15c 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java
@@ -77,6 +77,11 @@ public class PartitionPersistProcessor implements 
TridentProcessor {
     }
 
     @Override
+    public void flush() {
+        // NO-OP
+    }
+    
+    @Override
     public void finishBatch(ProcessorContext processorContext) {
         _collector.setContext(processorContext);
         Object batchId = processorContext.batchId;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java
index 4ecb251..d80591e 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java
@@ -63,6 +63,13 @@ public class ProjectedProcessor implements TridentProcessor {
     }
 
     @Override
+    public void flush() {
+        for(TupleReceiver r: _context.getReceivers()) {
+            r.flush();
+        }
+    }
+
+    @Override
     public void finishBatch(ProcessorContext processorContext) {
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java
index 2d9ebc0..1d1d515 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java
@@ -79,6 +79,11 @@ public class StateQueryProcessor implements TridentProcessor 
{
     }
 
     @Override
+    public void flush() {
+        // NO-OP
+    }
+
+    @Override
     public void finishBatch(ProcessorContext processorContext) {
         BatchState state = (BatchState) 
processorContext.state[_context.getStateIndex()];
         if(!state.tuples.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java
index dd950d3..9f2a251 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -194,7 +194,12 @@ public class RichSpoutBatchExecutor implements 
ITridentSpout<Object> {
         public void emitDirect(int task, String stream, List<Object> values, 
Object id) {
             throw new UnsupportedOperationException("Trident does not support 
direct streams");
         }
-        
+
+        @Override
+        public void flush() {
+            _collector.flush();
+        }
+
         @Override
         public long getPendingCount() {
             return pendingCount;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java
 
b/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java
index af98465..9447f7a 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -171,6 +171,11 @@ public class RichSpoutBatchTriggerer implements IRichSpout 
{
         }
 
         @Override
+        public void flush() {
+            _collector.flush();
+        }
+
+        @Override
         public void reportError(Throwable t) {
             _collector.reportError(t);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java 
b/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java
index b60a7bd..fa9827c 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java
@@ -131,6 +131,11 @@ public class TridentSpoutExecutor implements 
ITridentBatchBolt {
         }
 
         @Override
+        public void flush() {
+            _delegate.flush();
+        }
+
+        @Override
         public void reportError(Throwable t) {
             _delegate.reportError(t);
         }        

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
index 88d222c..a0b048f 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
@@ -185,7 +185,12 @@ public class TridentBoltExecutor implements IRichBolt {
         public void resetTimeout(Tuple tuple) {
             throw new IllegalStateException("Method should never be called");
         }
-        
+
+        @Override
+        public void flush() {
+            _delegate.flush();
+        }
+
         public void reportError(Throwable error) {
             _delegate.reportError(error);
         }
@@ -208,7 +213,7 @@ public class TridentBoltExecutor implements IRichBolt {
     TopologyContext _context;
     
     @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {        
+    public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
         _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
         _lastRotate = System.currentTimeMillis();
         _batches = new RotatingMap<>(2);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
 
b/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index e412eb1..672fad6 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -181,6 +181,11 @@ public abstract class AbstractTridentWindowManager<T> 
implements ITridentWindowM
         }
 
         @Override
+        public void flush() {
+            // NO-OP
+        }
+
+        @Override
         public void reportError(Throwable t) {
             delegateCollector.reportError(t);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
 
b/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index 721063a..c1f9d5b 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -148,6 +148,11 @@ public class WindowTridentProcessor implements 
TridentProcessor {
     }
 
     @Override
+    public void flush() {
+        // NO-OP
+    }
+
+    @Override
     public void finishBatch(ProcessorContext processorContext) {
 
         Object batchId = processorContext.batchId;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java 
b/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java
index 0ea1291..c7661a5 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java
@@ -17,10 +17,13 @@
  */
 package org.apache.storm.tuple;
 
+import org.apache.storm.Constants;
+import org.apache.storm.task.GeneralTopologyContext;
+
 /**
  * A Tuple that is addressed to a destination.
  */
-public class AddressedTuple {
+public final class AddressedTuple {
     /**
      * Destination used when broadcasting a tuple.
      */
@@ -45,4 +48,10 @@ public class AddressedTuple {
     public String toString() {
         return "[dest: "+dest+" tuple: "+tuple+"]";
     }
+
+    public static AddressedTuple createFlushTuple(GeneralTopologyContext 
workerTopologyContext) {
+        TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(), 
Constants.SYSTEM_COMPONENT_ID,
+            (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_FLUSH_STREAM_ID);
+        return new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); // 
one instance per executor avoids false sharing of CPU cache
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java 
b/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java
index 6883995..bce7946 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java
@@ -20,6 +20,7 @@ package org.apache.storm.tuple;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -27,14 +28,16 @@ import java.util.Random;
 import java.util.Set;
 
 public class MessageId {
+    final static MessageId unanchoredMsgId =  makeId(Collections.emptyMap());
+
     private Map<Long, Long> _anchorsToIds;
-    
+
     public static long generateId(Random rand) {
         return rand.nextLong();
     }
 
     public static MessageId makeUnanchored() {
-        return makeId(new HashMap<Long, Long>());
+        return unanchoredMsgId;
     }
         
     public static MessageId makeId(Map<Long, Long> anchorsToIds) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java 
b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
index 9356c4b..28dae6d 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
@@ -24,50 +24,55 @@ import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.GeneralTopologyContext;
 
 public class TupleImpl implements Tuple {
-    private final List<Object> values;
-    private final int taskId;
-    private final String streamId;
-    private final GeneralTopologyContext context;
-    private final MessageId id;
+    private List<Object> values;
+    private int taskId;
+    private String streamId;
+    private GeneralTopologyContext context;
+    private MessageId id;
+    private final String srcComponent;
     private Long _processSampleStartTime;
     private Long _executeSampleStartTime;
     private long _outAckVal = 0;
-    
+
     public TupleImpl(Tuple t) {
         this.values = t.getValues();
         this.taskId = t.getSourceTask();
         this.streamId = t.getSourceStreamId();
         this.id = t.getMessageId();
         this.context = t.getContext();
-        if (t instanceof TupleImpl) {
+        this.srcComponent = t.getSourceComponent();
+        try {
             TupleImpl ti = (TupleImpl) t;
             this._processSampleStartTime = ti._processSampleStartTime;
             this._executeSampleStartTime = ti._executeSampleStartTime;
             this._outAckVal = ti._outAckVal;
+        } catch (ClassCastException e) {
+            // ignore ... if t is not a TupleImpl type .. faster than checking 
and then casting
         }
     }
 
-    public TupleImpl(GeneralTopologyContext context, List<Object> values, int 
taskId, String streamId, MessageId id) {
+    public TupleImpl(GeneralTopologyContext context, List<Object> values, 
String srcComponent, int taskId, String streamId, MessageId id) {
         this.values = Collections.unmodifiableList(values);
         this.taskId = taskId;
         this.streamId = streamId;
         this.id = id;
         this.context = context;
-        
-        String componentId = context.getComponentId(taskId);
-        Fields schema = context.getComponentOutputFields(componentId, 
streamId);
-        if(values.size()!=schema.size()) {
-            throw new IllegalArgumentException(
-                    "Tuple created with wrong number of fields. " +
-                    "Expected " + schema.size() + " fields but got " +
-                    values.size() + " fields");
+        this.srcComponent = srcComponent;
+
+        if (context.doSanityCheck()) {
+            String componentId = context.getComponentId(taskId);
+            Fields schema = context.getComponentOutputFields(componentId, 
streamId);
+            if (values.size() != schema.size()) {
+                throw new IllegalArgumentException("Tuple created with wrong 
number of fields. Expected " + schema.size()
+                    + " fields but got " + values.size() + " fields");
+            }
         }
     }
 
-    public TupleImpl(GeneralTopologyContext context, List<Object> values, int 
taskId, String streamId) {
-        this(context, values, taskId, streamId, MessageId.makeUnanchored());
-    }
-    
+    public TupleImpl(GeneralTopologyContext context, List<Object> values, 
String srcComponent, int taskId, String streamId) {
+        this(context, values, srcComponent, taskId, streamId, 
MessageId.makeUnanchored());
+    }    
+
     public void setProcessSampleStartTime(long ms) {
         _processSampleStartTime = ms;
     }
@@ -83,7 +88,7 @@ public class TupleImpl implements Tuple {
     public Long getExecuteSampleStartTime() {
         return _executeSampleStartTime;
     }
-    
+
     public void updateAckVal(long val) {
         _outAckVal = _outAckVal ^ val;
     }
@@ -91,7 +96,7 @@ public class TupleImpl implements Tuple {
     public long getAckVal() {
         return _outAckVal;
     }
-    
+
     /** Tuple APIs*/
     @Override
     public int size() {
@@ -213,7 +218,7 @@ public class TupleImpl implements Tuple {
         return values;
     }
 
-    @Override    
+    @Override
     public Fields getFields() {
         return context.getComponentOutputFields(getSourceComponent(), 
getSourceStreamId());
     }
@@ -235,7 +240,7 @@ public class TupleImpl implements Tuple {
 
     @Override
     public String getSourceComponent() {
-        return context.getComponentId(taskId);
+        return srcComponent;
     }
 
     @Override
@@ -257,7 +262,7 @@ public class TupleImpl implements Tuple {
     public GeneralTopologyContext getContext() {
         return context;
     }
-    
+
     @Override
     public String toString() {
         return "source: " + getSourceComponent() + ":" + taskId + ", stream: " 
+ streamId + ", id: "+ id.toString() + ", " + values.toString() + " 
PROC_START_TIME(sampled): " + _processSampleStartTime + " 
EXEC_START_TIME(sampled): " + _executeSampleStartTime;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java 
b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 1bc94ac..0d5bab8 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
+import java.util.function.BooleanSupplier;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
@@ -144,19 +145,19 @@ public class ConfigUtils {
         throw new IllegalArgumentException("Illegal topology.stats.sample.rate 
in conf: " + rate);
     }
 
-    public static Callable<Boolean> mkStatsSampler(Map<String, Object> conf) {
+    public static BooleanSupplier mkStatsSampler(Map<String, Object> conf) {
         return evenSampler(samplingRate(conf));
     }
 
-    public static Callable<Boolean> evenSampler(final int samplingFreq) {
+    public static BooleanSupplier evenSampler(final int samplingFreq) {
         final Random random = new Random();
 
-        return new Callable<Boolean>() {
+        return new BooleanSupplier() {
             private int curr = -1;
             private int target = random.nextInt(samplingFreq);
 
             @Override
-            public Boolean call() throws Exception {
+            public boolean getAsBoolean() {
                 curr++;
                 if (curr >= samplingFreq) {
                     curr = 0;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/DisruptorBackpressureCallback.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/utils/DisruptorBackpressureCallback.java
 
b/storm-client/src/jvm/org/apache/storm/utils/DisruptorBackpressureCallback.java
deleted file mode 100644
index 69be7be..0000000
--- 
a/storm-client/src/jvm/org/apache/storm/utils/DisruptorBackpressureCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.storm.utils;
-
-public interface DisruptorBackpressureCallback {
-
-    void highWaterMark() throws Exception;
-
-    void lowWaterMark() throws Exception;
-}

Reply via email to